You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

342 lines
14 KiB

"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
const bunyan_1 = __importDefault(require("@expo/bunyan"));
const loosely_validate_event_1 = __importDefault(require("@segment/loosely-validate-event"));
const assert_1 = __importDefault(require("assert"));
const fetch_retry_1 = __importDefault(require("fetch-retry"));
const md5_1 = __importDefault(require("md5"));
const node_fetch_1 = __importDefault(require("node-fetch"));
const remove_trailing_slash_1 = __importDefault(require("remove-trailing-slash"));
const uuid_1 = require("uuid");
const version = require('./package.json').version;
const retryableFetch = (0, fetch_retry_1.default)(node_fetch_1.default);
const setImmediate = global.setImmediate || process.nextTick.bind(process);
class Analytics {
/**
* Initialize a new `Analytics` instance with your RudderStack project's `writeKey` and an
* optional dictionary of options.
*/
constructor(writeKey, dataPlaneURL, { enable = true, timeout = 0, flushAt = 20, flushInterval = 20000, maxFlushSizeInBytes = 1024 * 1000 * 3.9, // defaults to ~3.9mb
maxQueueLength = 1000, logLevel = bunyan_1.default.FATAL, } = {}) {
this.inFlightFlush = null;
this.queue = [];
this.flushCallbacks = [];
this.flushResponses = [];
this.finalMessageId = null;
this.flushed = false;
this.timer = null;
this.enable = enable;
(0, assert_1.default)(writeKey, `The project's write key must be specified`);
(0, assert_1.default)(dataPlaneURL, `The data plane URL must be specified`);
this.writeKey = writeKey;
this.host = (0, remove_trailing_slash_1.default)(dataPlaneURL);
this.timeout = timeout;
this.flushAt = Math.max(flushAt, 1);
this.flushInterval = flushInterval;
this.maxFlushSizeInBytes = maxFlushSizeInBytes;
this.maxQueueLength = maxQueueLength;
this.logger = bunyan_1.default.createLogger({
name: '@expo/rudder-node-sdk',
level: logLevel,
});
}
/**
* Sends an "identify" message that associates traits with a user.
*/
identify(message, callback) {
this.validate(message, 'identify');
this.enqueue('identify', message, callback);
return this;
}
/**
* Sends a "group" message that identifies this user with a group.
*/
group(message, callback) {
this.validate(message, 'group');
this.enqueue('group', message, callback);
return this;
}
/**
* Sends a "track" event that records an action.
*/
track(message, callback) {
this.validate(message, 'track');
this.enqueue('track', message, callback);
return this;
}
/**
* Sends a "page" event that records a page view on a website.
*/
page(message, callback) {
this.validate(message, 'page');
this.enqueue('page', message, callback);
return this;
}
/**
* Sends a "screen" event that records a screen view in an app.
*/
screen(message, callback) {
this.validate(message, 'screen');
this.enqueue('screen', message, callback);
return this;
}
/**
* Sends an "alias" message that associates one ID with another.
*/
alias(message, callback) {
this.validate(message, 'alias');
this.enqueue('alias', message, callback);
return this;
}
validate(message, type) {
try {
(0, loosely_validate_event_1.default)(message, type);
}
catch (e) {
if (e.message === 'Your message must be < 32kb.') {
this.logger.warn('Your message must be < 32KiB. This is currently surfaced as a warning. Please update your code.', message);
return;
}
throw e;
}
}
/**
* Adds a message of the specified type to the queue and flushes the queue if appropriate.
*/
enqueue(type, message, callback = () => { }) {
var _a, _b;
if (!this.enable) {
setImmediate(callback);
return;
}
if (this.queue.length >= this.maxQueueLength) {
this.logger.error(`Not adding events for processing as queue size ${this.queue.length} exceeds max configuration ${this.maxQueueLength}`);
setImmediate(callback);
return;
}
if (type === 'identify') {
(_a = message.traits) !== null && _a !== void 0 ? _a : (message.traits = {});
(_b = message.context) !== null && _b !== void 0 ? _b : (message.context = {});
message.context.traits = message.traits;
}
message = { ...message };
message.type = type;
message.context = {
library: {
name: '@expo/rudder-sdk-node',
version,
},
...message.context,
};
message._metadata = {
nodeVersion: process.versions.node,
...message._metadata,
};
if (!message.originalTimestamp) {
message.originalTimestamp = new Date();
}
if (!message.messageId) {
// We md5 the messaage to add more randomness. This is primarily meant
// for use in the browser where the uuid package falls back to Math.random()
// which is not a great source of randomness.
// Borrowed from analytics.js (https://github.com/segment-integrations/analytics.js-integration-segmentio/blob/a20d2a2d222aeb3ab2a8c7e72280f1df2618440e/lib/index.js#L255-L256).
message.messageId = `node-${(0, md5_1.default)(JSON.stringify(message))}-${(0, uuid_1.v4)()}`;
}
this.queue.push({ message, callback });
if (!this.flushed) {
this.flushed = true;
this.flush();
return;
}
const isDivisibleByFlushAt = this.queue.length % this.flushAt === 0;
if (isDivisibleByFlushAt) {
this.logger.debug(`flushAt reached, messageQueueLength is ${this.queue.length}, trying flush...`);
this.flush();
}
else if (this.flushInterval && !this.timer) {
// only start a timer if there are dangling items in the message queue
this.logger.debug('no existing flush timer, creating new one');
this.timer = setTimeout(this.flush.bind(this), this.flushInterval);
}
}
/**
* Flushes the message queue to the server immediately if a flush is not already in progress.
*/
async flush(callback = () => { }) {
this.logger.debug('in flush');
// will cause new messages to be rolled up into the in-flight flush
this.finalMessageId = this.queue.length
? this.queue[this.queue.length - 1].message.messageId
: null;
this.logger.trace('finalMessageId: ' + this.finalMessageId);
this.flushCallbacks.push(callback);
if (this.inFlightFlush) {
this.logger.debug('skipping flush, there is an in flight flush');
return await this.inFlightFlush;
}
this.inFlightFlush = this.executeFlush();
const flushResponse = await this.inFlightFlush;
this.logger.debug('resetting client flush state');
this.inFlightFlush = null;
this.finalMessageId = null;
this.logger.trace('===flushResponse===', flushResponse);
return flushResponse;
}
/**
* Flushes messages from the message queue to the server immediately. After the flush has finished,
* this checks for pending flushes and executes them. All data is rolled up into a single FlushResponse.
*/
async executeFlush(flushedItems = []) {
var _a;
this.logger.debug('in execute flush');
if (!this.enable) {
this.logger.debug('client not enabled, skipping flush');
this.flushResponses.splice(0, this.flushResponses.length);
const nullResponse = this.nullFlushResponse();
this.flushCallbacks
.splice(0, this.flushCallbacks.length)
.map((callback) => setImmediate(callback, nullResponse));
return nullResponse;
}
if (this.timer) {
this.logger.debug('cancelling existing timer...');
clearTimeout(this.timer);
this.timer = null;
}
if (!this.queue.length) {
this.logger.debug('queue is empty, nothing to flush');
this.flushResponses.splice(0, this.flushResponses.length);
const nullResponse = this.nullFlushResponse();
this.flushCallbacks
.splice(0, this.flushCallbacks.length)
.map((callback) => setImmediate(callback, nullResponse));
return nullResponse;
}
let flushSize = 0;
let spliceIndex = 0;
// guard against requests larger than 4mb
for (let i = 0; i < this.queue.length; i++) {
const item = this.queue[i];
const itemSize = JSON.stringify(item).length;
const exceededMaxFlushSize = flushSize + itemSize > this.maxFlushSizeInBytes;
if (exceededMaxFlushSize) {
break;
}
flushSize += itemSize;
spliceIndex++;
if (((_a = item.message.messageId) !== null && _a !== void 0 ? _a : null) === this.finalMessageId || !this.finalMessageId) {
break; // guard against flushing items added to the message queue during this flush
}
}
const itemsToFlush = this.queue.splice(0, spliceIndex);
const callbacks = itemsToFlush.map((item) => item.callback);
const currentBatchOfMessages = itemsToFlush.map((item) => {
// if someone mangles directly with queue
if (typeof item.message == 'object') {
item.message.sentAt = new Date();
}
return item.message;
});
const done = (err) => {
callbacks.forEach((callback_) => {
callback_(err);
});
const flushResponses = this.flushResponses.slice(0, this.flushResponses.length);
this.flushCallbacks
.splice(0, this.flushCallbacks.length)
.map((callback) => setImmediate(callback, flushResponses));
};
const data = {
batch: currentBatchOfMessages,
sentAt: new Date(),
};
this.logger.debug('batch size is ' + itemsToFlush.length);
this.logger.trace('===data===', data);
const req = {
method: 'POST',
headers: {
accept: 'application/json, text/plain, */*',
'content-type': 'application/json;charset=utf-8',
'user-agent': `expo-rudder-sdk-node/${version}`,
authorization: 'Basic ' + Buffer.from(`${this.writeKey}:`).toString('base64'),
},
body: JSON.stringify(data),
timeout: this.timeout > 0 ? this.timeout : undefined,
retryDelay: this.getExponentialDelay.bind(this),
retryOn: this.isErrorRetryable.bind(this),
};
let error = undefined;
try {
const response = await retryableFetch(`${this.host}`, req);
if (!response.ok) {
// handle 4xx 5xx errors
this.logger.error('request failed to send after 3 retries, dropping ' + itemsToFlush.length + ' events');
error = new Error(response.statusText);
}
}
catch (err) {
// handle network errors
this.logger.error('request failed to send after 3 retries, dropping ' + itemsToFlush.length + ' events');
error = err;
}
this.flushResponses.push({ error, data });
const finishedFlushing = currentBatchOfMessages[currentBatchOfMessages.length - 1].messageId === this.finalMessageId ||
!this.finalMessageId;
if (finishedFlushing) {
if (error) {
done(error);
}
else {
done();
}
return this.flushResponses.splice(0, this.flushResponses.length);
}
callbacks.forEach((callback_) => {
callback_(error);
});
return await this.executeFlush(flushedItems.concat(itemsToFlush));
}
/**
* Calculates the amount of time to wait before retrying a request, given the number of prior
* retries (excluding the initial attempt).
*
* @param priorRetryCount the number of prior retries, starting from zero
*/
getExponentialDelay(priorRetryCount) {
const delay = 2 ** priorRetryCount * 200;
const jitter = delay * 0.2 * Math.random(); // 0-20% of the delay
return delay + jitter;
}
/**
* Returns whether to retry a request that failed with the given error or returned the given
* response.
*/
isErrorRetryable(priorRetryCount, error, response) {
// 3 retries max
if (priorRetryCount > 2) {
return false;
}
return (
// Retry on any network error
!!error ||
// Retry if rate limited
response.status === 429 ||
// Retry on 5xx status codes due to server errors
(response.status >= 500 && response.status <= 599));
}
nullFlushResponse() {
return [
{
data: {
batch: [],
sentAt: new Date(),
},
},
];
}
}
exports.default = Analytics;
//# sourceMappingURL=index.js.map