commit c310cd7943e04d43fa46a6f86fd981e1574bfba1 Author: pauljako Date: Fri Jun 20 09:47:10 2025 +0200 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..849ddff --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +dist/ diff --git a/app-side/i18n/en-US.po b/app-side/i18n/en-US.po new file mode 100644 index 0000000..9eabe48 --- /dev/null +++ b/app-side/i18n/en-US.po @@ -0,0 +1,2 @@ +msgid "example" +msgstr "This is an example in app-side" \ No newline at end of file diff --git a/app-side/index.js b/app-side/index.js new file mode 100644 index 0000000..65285ff --- /dev/null +++ b/app-side/index.js @@ -0,0 +1,60 @@ +import { MessageBuilder } from "../shared/message-side"; + +const messageBuilder = new MessageBuilder(); + +async function fetchDirectory(ctx) { + try { + const res = await fetch({ + url: "https://directory.spaceapi.io", + method: "GET", + }); + const resBody = + typeof res.body === "string" ? JSON.parse(res.body) : res.body; + + ctx.response({ + data: { success: true, result: resBody }, + }); + } catch (error) { + ctx.response({ + data: { success: false, result: {} }, + }); + } +} + +async function fetchStatus(url, ctx) { + try { + const res = await fetch({ + url: url, + method: "GET", + }); + const resBody = + typeof res.body === "string" ? JSON.parse(res.body) : res.body; + + ctx.response({ + data: { success: true, result: resBody }, + }); + } catch (error) { + ctx.response({ + data: { success: false, result: {} }, + }); + } +} + +AppSideService({ + onInit() { + messageBuilder.listen(() => {}); + + messageBuilder.on("request", (ctx) => { + const jsonRpc = messageBuilder.buf2Json(ctx.request.payload); + if (jsonRpc.method === "GET_DIRECTORY") { + return fetchDirectory(ctx); + } else if (jsonRpc.method === "GET_STATUS") { + return fetchStatus(jsonRpc.url, ctx); + } + }); + }, + + onRun() {}, + + onDestroy() {}, +}); diff --git a/app.js b/app.js new file mode 100644 index 0000000..c45b06a --- /dev/null +++ b/app.js @@ -0,0 +1,28 @@ +import "./shared/device-polyfill"; +import { MessageBuilder } from "./shared/message"; +import { getPackageInfo } from "@zos/app"; +import * as ble from "@zos/ble"; + +App({ + globalData: { + messageBuilder: null, + }, + onCreate(options) { + console.log("app on create invoke"); + const { appId } = getPackageInfo(); + const messageBuilder = new MessageBuilder({ + appId, + appDevicePort: 20, + appSidePort: 0, + ble, + }); + this.globalData.messageBuilder = messageBuilder; + messageBuilder.connect(); + }, + + onDestroy(options) { + console.log("app on destroy invoke"); + this.globalData.messageBuilder && + this.globalData.messageBuilder.disConnect(); + }, +}); diff --git a/app.json b/app.json new file mode 100644 index 0000000..eac157c --- /dev/null +++ b/app.json @@ -0,0 +1,57 @@ +{ + "configVersion": "v2", + "app": { + "appId": 28358, + "appName": "ZeppSpace", + "appType": "app", + "version": { + "code": 1, + "name": "1.0.1" + }, + "icon": "icon.png", + "vender": "pauljako", + "description": "A ZeppOS App for SpaceAPI" + }, + "permissions": [ + "data:os.device.info", + "device:os.local_storage", + "device:os.geolocation" + ], + "runtime": { + "apiVersion": { + "compatible": "2.0.0", + "target": "2.0.0", + "minVersion": "2.0" + } + }, + "debug": false, + "targets": { + "bip5": { + "module": { + "page": { + "pages": ["pages/home/index", "pages/status/index"] + }, + "app-side": { + "path": "app-side/index" + } + }, + "platforms": [ + { + "name": "bip5", + "deviceSource": 8454400 + }, + { + "name": "bip5-w", + "deviceSource": 8454401 + } + ], + "designWidth": 380 + } + }, + "i18n": { + "en-US": { + "appName": "ZeppSpace" + } + }, + "defaultLanguage": "en-US" +} diff --git a/assets/bip5/icon.png b/assets/bip5/icon.png new file mode 100644 index 0000000..7f0f900 Binary files /dev/null and b/assets/bip5/icon.png differ diff --git a/pages/home/i18n/en-US.po b/pages/home/i18n/en-US.po new file mode 100644 index 0000000..e168a33 --- /dev/null +++ b/pages/home/i18n/en-US.po @@ -0,0 +1,2 @@ +msgid "example" +msgstr "This is an example in device" \ No newline at end of file diff --git a/pages/home/index.js b/pages/home/index.js new file mode 100644 index 0000000..6890dcb --- /dev/null +++ b/pages/home/index.js @@ -0,0 +1,78 @@ +import { + createWidget, + widget, + align, + updateStatusBarTitle, + event, +} from "@zos/ui"; +import { log as Logger, px } from "@zos/utils"; +import { push } from "@zos/router"; +import { + DEFAULT_COLOR, + DEFAULT_COLOR_TRANSPARENT, +} from "../../utils/config/constants"; +import { DEVICE_WIDTH } from "../../utils/config/device"; +import VisLog from "@silver-zepp/vis-log"; +const vis = new VisLog("index.js"); + +const logger = Logger.getLogger("ZeppSpace"); +const { messageBuilder } = getApp()._options.globalData; + +let spaces = {}; + +Page({ + state: {}, + build() { + updateStatusBarTitle("Spaces (Loading)"); + this.fetchData(); + }, + + fetchData() { + messageBuilder + .request({ + method: "GET_DIRECTORY", + }) + .then((data) => { + vis.log("data received"); + const statusSuccess = data["success"]; + if (!statusSuccess) { + return; + } + + let spaceNames = Object.fromEntries( + Object.entries(data["result"]).sort(([a], [b]) => a.localeCompare(b)), + ); + + let yPos = 70; + for (let key in spaceNames) { + if (spaceNames.hasOwnProperty(key)) { + spaces[key] = createWidget(widget.TEXT, { + x: 0, + y: px(yPos), + w: px(DEVICE_WIDTH), + h: px(50), + align_h: align.CENTER_H, + align_v: align.CENTER_V, + text_size: px(32), + color: 0xffffff, + text: key, + }); + + spaces[key].addEventListener(event.CLICK_UP, (info) => { + push({ + url: "pages/status/index", + params: { + name: key, + url: spaceNames[key], + }, + }); + }); + + yPos += 50; + } + } + + updateStatusBarTitle("Spaces"); + }); + }, +}); diff --git a/pages/status/i18n/en-US.po b/pages/status/i18n/en-US.po new file mode 100644 index 0000000..e168a33 --- /dev/null +++ b/pages/status/i18n/en-US.po @@ -0,0 +1,2 @@ +msgid "example" +msgstr "This is an example in device" \ No newline at end of file diff --git a/pages/status/index.js b/pages/status/index.js new file mode 100644 index 0000000..cf901bd --- /dev/null +++ b/pages/status/index.js @@ -0,0 +1,110 @@ +import { createWidget, widget, align, updateStatusBarTitle } from "@zos/ui"; +import { log as Logger, px } from "@zos/utils"; +import { back } from "@zos/router"; +import { + DEFAULT_COLOR, + DEFAULT_COLOR_TRANSPARENT, +} from "../../utils/config/constants"; +import { DEVICE_WIDTH } from "../../utils/config/device"; +import VisLog from "@silver-zepp/vis-log"; +const vis = new VisLog("index.js"); + +const logger = Logger.getLogger("ZeppSpace"); +const { messageBuilder } = getApp()._options.globalData; + +let title = ""; +let url = ""; + +Page({ + state: {}, + onInit(params) { + const data = JSON.parse(params); + if (data["url"] == undefined || data["name"] == undefined) { + back(); + return; + } + title = data["name"]; + url = data["url"]; + }, + build() { + updateStatusBarTitle(title + " (Loading)"); + this.fetchData(); + }, + + fetchData() { + messageBuilder + .request({ + method: "GET_STATUS", + url: url, + }) + .then((data) => { + const statusSuccess = data["success"]; + if (!statusSuccess) { + return; + } + + let open = data["result"]["state"]["open"]; + + let openText = open ? "Yes" : "No"; + + createWidget(widget.TEXT, { + x: 0, + y: px(70), + w: px(DEVICE_WIDTH), + h: px(50), + align_h: align.CENTER_H, + align_v: align.CENTER_V, + text_size: px(32), + color: 0xffffff, + text: "Open: " + openText, + }); + + createWidget(widget.TEXT, { + x: 0, + y: px(120), + w: px(DEVICE_WIDTH), + h: px(50), + align_h: align.CENTER_H, + align_v: align.CENTER_V, + text_size: px(32), + color: 0xffffff, + text: + "Last Updated: " + + this.convertTime(data["result"]["state"]["lastchange"]), + }); + + updateStatusBarTitle(title); + }); + }, + + convertTime(timestamp) { + var a = new Date(timestamp * 1000); + var today = new Date(); + var yesterday = new Date(Date.now() - 86400000); + var months = [ + "Jan", + "Feb", + "Mar", + "Apr", + "May", + "Jun", + "Jul", + "Aug", + "Sep", + "Oct", + "Nov", + "Dec", + ]; + var year = a.getFullYear(); + var month = months[a.getMonth()]; + var date = a.getDate(); + var hour = a.getHours() < 10 ? "0" + a.getHours() : a.getHours(); + var min = a.getMinutes() < 10 ? "0" + a.getMinutes() : a.getMinutes(); + if (a.setHours(0, 0, 0, 0) == today.setHours(0, 0, 0, 0)) + return hour + ":" + min; + else if (a.setHours(0, 0, 0, 0) == yesterday.setHours(0, 0, 0, 0)) + return "Yesterday, " + hour + ":" + min; + else if (year == today.getFullYear()) return date + " " + month; + else return date + " " + month + " " + year; + }, +}); diff --git a/shared/data.js b/shared/data.js new file mode 100644 index 0000000..debc2e7 --- /dev/null +++ b/shared/data.js @@ -0,0 +1,67 @@ +export function json2buf(json) { + return str2buf(json2str(json)) +} + +export function json2bin(json) { + return str2bin(json2str(json)) +} + +export function len(binOrBuf) { + return binOrBuf.byteLength +} + +export function buf2json(buf) { + return str2json(buf2str(buf)) +} + +export function str2json(str) { + return JSON.parse(str) +} + +export function json2str(json) { + return JSON.stringify(json) +} + +export function str2buf(str) { + return Buffer.from(str, 'utf-8') +} + +export function buf2str(buf) { + return buf.toString('utf-8') +} + +export function bin2buf(bin) { + return Buffer.from(bin) +} + +export function buf2bin(buf) { + return buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength) +} + +export function buf2hex(buf) { + return buf.toString('hex') +} + +export function bin2hex(bin) { + return buf2hex(bin2buf(bin)) +} + +export function bin2json(bin) { + return buf2json(bin2buf(bin)) +} + +export function bin2str(bin) { + return buf2str(bin2buf(bin)) +} + +export function str2bin(str) { + return buf2bin(str2buf(str)) +} + +export function allocOfBin(size = 0) { + return Buffer.alloc(size).buffer +} + +export function allocOfBuf(size = 0) { + return Buffer.alloc(size) +} diff --git a/shared/defer.js b/shared/defer.js new file mode 100644 index 0000000..9017bda --- /dev/null +++ b/shared/defer.js @@ -0,0 +1,35 @@ +export function Deferred() { + const defer = {} + + defer.promise = new Promise(function (resolve, reject) { + defer.resolve = resolve + defer.reject = reject + }) + + return defer +} + +export function delay(ms) { + const defer = Deferred() + + setTimeout(defer.resolve, ms) + + return defer.promise +} + +export function timeout(ms, cb) { + const defer = Deferred() + ms = ms || 1000 + + const wait = setTimeout(() => { + clearTimeout(wait) + + if (cb) { + cb && cb(defer.resolve, defer.reject) + } else { + defer.reject('Timed out in ' + ms + 'ms.') + } + }, ms) + + return defer.promise +} diff --git a/shared/device-polyfill.js b/shared/device-polyfill.js new file mode 100644 index 0000000..f5abec6 --- /dev/null +++ b/shared/device-polyfill.js @@ -0,0 +1,6 @@ +import './es6-promise' +ES6Promise.polyfill() + +Promise._setScheduler(function (flush) { + flush && flush() +}) diff --git a/shared/es6-promise.js b/shared/es6-promise.js new file mode 100644 index 0000000..0e1c11c --- /dev/null +++ b/shared/es6-promise.js @@ -0,0 +1,1149 @@ +/*! + * @overview es6-promise - a tiny implementation of Promises/A+. + * @copyright Copyright (c) 2014 Yehuda Katz, Tom Dale, Stefan Penner and contributors (Conversion to ES6 API by Jake Archibald) + * @license Licensed under MIT license + * See https://raw.githubusercontent.com/stefanpenner/es6-promise/master/LICENSE + * @version v4.2.8+1e68dce6 + */ + +(function (global, factory) { + global.ES6Promise = factory(); +}(globalThis, (function () { 'use strict'; + +function objectOrFunction(x) { + var type = typeof x; + return x !== null && (type === 'object' || type === 'function'); +} + +function isFunction(x) { + return typeof x === 'function'; +} + + + +var _isArray = void 0; +if (Array.isArray) { + _isArray = Array.isArray; +} else { + _isArray = function (x) { + return Object.prototype.toString.call(x) === '[object Array]'; + }; +} + +var isArray = _isArray; + +var len = 0; +var vertxNext = void 0; +var customSchedulerFn = void 0; + +var asap = function asap(callback, arg) { + queue[len] = callback; + queue[len + 1] = arg; + len += 2; + if (len === 2) { + // If len is 2, that means that we need to schedule an async flush. + // If additional callbacks are queued before the queue is flushed, they + // will be processed by this flush that we are scheduling. + if (customSchedulerFn) { + customSchedulerFn(flush); + } else { + scheduleFlush(); + } + } +}; + +function setScheduler(scheduleFn) { + customSchedulerFn = scheduleFn; +} + +function setAsap(asapFn) { + asap = asapFn; +} + +var browserWindow = typeof window !== 'undefined' ? window : undefined; +var browserGlobal = browserWindow || {}; +var BrowserMutationObserver = browserGlobal.MutationObserver || browserGlobal.WebKitMutationObserver; +var isNode = typeof self === 'undefined' && typeof process !== 'undefined' && {}.toString.call(process) === '[object process]'; + +// test for web worker but not in IE10 +var isWorker = typeof Uint8ClampedArray !== 'undefined' && typeof importScripts !== 'undefined' && typeof MessageChannel !== 'undefined'; + +// node +function useNextTick() { + // node version 0.10.x displays a deprecation warning when nextTick is used recursively + // see https://github.com/cujojs/when/issues/410 for details + return function () { + return process.nextTick(flush); + }; +} + +// vertx +function useVertxTimer() { + if (typeof vertxNext !== 'undefined') { + return function () { + vertxNext(flush); + }; + } + + return useSetTimeout(); +} + +function useMutationObserver() { + var iterations = 0; + var observer = new BrowserMutationObserver(flush); + var node = document.createTextNode(''); + observer.observe(node, { characterData: true }); + + return function () { + node.data = iterations = ++iterations % 2; + }; +} + +// web worker +function useMessageChannel() { + var channel = new MessageChannel(); + channel.port1.onmessage = flush; + return function () { + return channel.port2.postMessage(0); + }; +} + +function useSetTimeout() { + // Store setTimeout reference so es6-promise will be unaffected by + // other code modifying setTimeout (like sinon.useFakeTimers()) + var globalSetTimeout = setTimeout; + return function () { + return globalSetTimeout(flush, 1); + }; +} + +var queue = new Array(1000); +function flush() { + for (var i = 0; i < len; i += 2) { + var callback = queue[i]; + var arg = queue[i + 1]; + + callback(arg); + + queue[i] = undefined; + queue[i + 1] = undefined; + } + + len = 0; +} + +function attemptVertx() { + try { + var vertx = Function('return this')().require('vertx'); + vertxNext = vertx.runOnLoop || vertx.runOnContext; + return useVertxTimer(); + } catch (e) { + return useSetTimeout(); + } +} + +var scheduleFlush = void 0; +// Decide what async method to use to triggering processing of queued callbacks: +if (isNode) { + scheduleFlush = useNextTick(); +} else if (BrowserMutationObserver) { + scheduleFlush = useMutationObserver(); +} else if (isWorker) { + scheduleFlush = useMessageChannel(); +} else if (browserWindow === undefined && typeof require === 'function') { + scheduleFlush = attemptVertx(); +} else { + scheduleFlush = useSetTimeout(); +} + +function then(onFulfillment, onRejection) { + var parent = this; + + var child = new this.constructor(noop); + + if (child[PROMISE_ID] === undefined) { + makePromise(child); + } + + var _state = parent._state; + + + if (_state) { + var callback = arguments[_state - 1]; + asap(function () { + return invokeCallback(_state, child, callback, parent._result); + }); + } else { + subscribe(parent, child, onFulfillment, onRejection); + } + + return child; +} + +/** + `Promise.resolve` returns a promise that will become resolved with the + passed `value`. It is shorthand for the following: + + ```javascript + let promise = new Promise(function(resolve, reject){ + resolve(1); + }); + + promise.then(function(value){ + // value === 1 + }); + ``` + + Instead of writing the above, your code now simply becomes the following: + + ```javascript + let promise = Promise.resolve(1); + + promise.then(function(value){ + // value === 1 + }); + ``` + + @method resolve + @static + @param {Any} value value that the returned promise will be resolved with + Useful for tooling. + @return {Promise} a promise that will become fulfilled with the given + `value` +*/ +function resolve$1(object) { + /*jshint validthis:true */ + var Constructor = this; + + if (object && typeof object === 'object' && object.constructor === Constructor) { + return object; + } + + var promise = new Constructor(noop); + resolve(promise, object); + return promise; +} + +var PROMISE_ID = Math.random().toString(36).substring(2); + +function noop() {} + +var PENDING = void 0; +var FULFILLED = 1; +var REJECTED = 2; + +function selfFulfillment() { + return new TypeError("You cannot resolve a promise with itself"); +} + +function cannotReturnOwn() { + return new TypeError('A promises callback cannot return that same promise.'); +} + +function tryThen(then$$1, value, fulfillmentHandler, rejectionHandler) { + try { + then$$1.call(value, fulfillmentHandler, rejectionHandler); + } catch (e) { + return e; + } +} + +function handleForeignThenable(promise, thenable, then$$1) { + asap(function (promise) { + var sealed = false; + var error = tryThen(then$$1, thenable, function (value) { + if (sealed) { + return; + } + sealed = true; + if (thenable !== value) { + resolve(promise, value); + } else { + fulfill(promise, value); + } + }, function (reason) { + if (sealed) { + return; + } + sealed = true; + + reject(promise, reason); + }, 'Settle: ' + (promise._label || ' unknown promise')); + + if (!sealed && error) { + sealed = true; + reject(promise, error); + } + }, promise); +} + +function handleOwnThenable(promise, thenable) { + if (thenable._state === FULFILLED) { + fulfill(promise, thenable._result); + } else if (thenable._state === REJECTED) { + reject(promise, thenable._result); + } else { + subscribe(thenable, undefined, function (value) { + return resolve(promise, value); + }, function (reason) { + return reject(promise, reason); + }); + } +} + +function handleMaybeThenable(promise, maybeThenable, then$$1) { + if (maybeThenable.constructor === promise.constructor && then$$1 === then && maybeThenable.constructor.resolve === resolve$1) { + handleOwnThenable(promise, maybeThenable); + } else { + if (then$$1 === undefined) { + fulfill(promise, maybeThenable); + } else if (isFunction(then$$1)) { + handleForeignThenable(promise, maybeThenable, then$$1); + } else { + fulfill(promise, maybeThenable); + } + } +} + +function resolve(promise, value) { + if (promise === value) { + reject(promise, selfFulfillment()); + } else if (objectOrFunction(value)) { + var then$$1 = void 0; + try { + then$$1 = value.then; + } catch (error) { + reject(promise, error); + return; + } + handleMaybeThenable(promise, value, then$$1); + } else { + fulfill(promise, value); + } +} + +function publishRejection(promise) { + if (promise._onerror) { + promise._onerror(promise._result); + } + + publish(promise); +} + +function fulfill(promise, value) { + if (promise._state !== PENDING) { + return; + } + + promise._result = value; + promise._state = FULFILLED; + + if (promise._subscribers.length !== 0) { + asap(publish, promise); + } +} + +function reject(promise, reason) { + if (promise._state !== PENDING) { + return; + } + promise._state = REJECTED; + promise._result = reason; + + asap(publishRejection, promise); +} + +function subscribe(parent, child, onFulfillment, onRejection) { + var _subscribers = parent._subscribers; + var length = _subscribers.length; + + + parent._onerror = null; + + _subscribers[length] = child; + _subscribers[length + FULFILLED] = onFulfillment; + _subscribers[length + REJECTED] = onRejection; + + if (length === 0 && parent._state) { + asap(publish, parent); + } +} + +function publish(promise) { + var subscribers = promise._subscribers; + var settled = promise._state; + + if (subscribers.length === 0) { + return; + } + + var child = void 0, + callback = void 0, + detail = promise._result; + + for (var i = 0; i < subscribers.length; i += 3) { + child = subscribers[i]; + callback = subscribers[i + settled]; + + if (child) { + invokeCallback(settled, child, callback, detail); + } else { + callback(detail); + } + } + + promise._subscribers.length = 0; +} + +function invokeCallback(settled, promise, callback, detail) { + var hasCallback = isFunction(callback), + value = void 0, + error = void 0, + succeeded = true; + + if (hasCallback) { + try { + value = callback(detail); + } catch (e) { + succeeded = false; + error = e; + } + + if (promise === value) { + reject(promise, cannotReturnOwn()); + return; + } + } else { + value = detail; + } + + if (promise._state !== PENDING) { + // noop + } else if (hasCallback && succeeded) { + resolve(promise, value); + } else if (succeeded === false) { + reject(promise, error); + } else if (settled === FULFILLED) { + fulfill(promise, value); + } else if (settled === REJECTED) { + reject(promise, value); + } +} + +function initializePromise(promise, resolver) { + try { + resolver(function resolvePromise(value) { + resolve(promise, value); + }, function rejectPromise(reason) { + reject(promise, reason); + }); + } catch (e) { + reject(promise, e); + } +} + +var id = 0; +function nextId() { + return id++; +} + +function makePromise(promise) { + promise[PROMISE_ID] = id++; + promise._state = undefined; + promise._result = undefined; + promise._subscribers = []; +} + +function validationError() { + return new Error('Array Methods must be provided an Array'); +} + +var Enumerator = function () { + function Enumerator(Constructor, input) { + this._instanceConstructor = Constructor; + this.promise = new Constructor(noop); + + if (!this.promise[PROMISE_ID]) { + makePromise(this.promise); + } + + if (isArray(input)) { + this.length = input.length; + this._remaining = input.length; + + this._result = new Array(this.length); + + if (this.length === 0) { + fulfill(this.promise, this._result); + } else { + this.length = this.length || 0; + this._enumerate(input); + if (this._remaining === 0) { + fulfill(this.promise, this._result); + } + } + } else { + reject(this.promise, validationError()); + } + } + + Enumerator.prototype._enumerate = function _enumerate(input) { + for (var i = 0; this._state === PENDING && i < input.length; i++) { + this._eachEntry(input[i], i); + } + }; + + Enumerator.prototype._eachEntry = function _eachEntry(entry, i) { + var c = this._instanceConstructor; + var resolve$$1 = c.resolve; + + + if (resolve$$1 === resolve$1) { + var _then = void 0; + var error = void 0; + var didError = false; + try { + _then = entry.then; + } catch (e) { + didError = true; + error = e; + } + + if (_then === then && entry._state !== PENDING) { + this._settledAt(entry._state, i, entry._result); + } else if (typeof _then !== 'function') { + this._remaining--; + this._result[i] = entry; + } else if (c === Promise$1) { + var promise = new c(noop); + if (didError) { + reject(promise, error); + } else { + handleMaybeThenable(promise, entry, _then); + } + this._willSettleAt(promise, i); + } else { + this._willSettleAt(new c(function (resolve$$1) { + return resolve$$1(entry); + }), i); + } + } else { + this._willSettleAt(resolve$$1(entry), i); + } + }; + + Enumerator.prototype._settledAt = function _settledAt(state, i, value) { + var promise = this.promise; + + + if (promise._state === PENDING) { + this._remaining--; + + if (state === REJECTED) { + reject(promise, value); + } else { + this._result[i] = value; + } + } + + if (this._remaining === 0) { + fulfill(promise, this._result); + } + }; + + Enumerator.prototype._willSettleAt = function _willSettleAt(promise, i) { + var enumerator = this; + + subscribe(promise, undefined, function (value) { + return enumerator._settledAt(FULFILLED, i, value); + }, function (reason) { + return enumerator._settledAt(REJECTED, i, reason); + }); + }; + + return Enumerator; +}(); + +/** + `Promise.all` accepts an array of promises, and returns a new promise which + is fulfilled with an array of fulfillment values for the passed promises, or + rejected with the reason of the first passed promise to be rejected. It casts all + elements of the passed iterable to promises as it runs this algorithm. + + Example: + + ```javascript + let promise1 = resolve(1); + let promise2 = resolve(2); + let promise3 = resolve(3); + let promises = [ promise1, promise2, promise3 ]; + + Promise.all(promises).then(function(array){ + // The array here would be [ 1, 2, 3 ]; + }); + ``` + + If any of the `promises` given to `all` are rejected, the first promise + that is rejected will be given as an argument to the returned promises's + rejection handler. For example: + + Example: + + ```javascript + let promise1 = resolve(1); + let promise2 = reject(new Error("2")); + let promise3 = reject(new Error("3")); + let promises = [ promise1, promise2, promise3 ]; + + Promise.all(promises).then(function(array){ + // Code here never runs because there are rejected promises! + }, function(error) { + // error.message === "2" + }); + ``` + + @method all + @static + @param {Array} entries array of promises + @param {String} label optional string for labeling the promise. + Useful for tooling. + @return {Promise} promise that is fulfilled when all `promises` have been + fulfilled, or rejected if any of them become rejected. + @static +*/ +function all(entries) { + return new Enumerator(this, entries).promise; +} + +/** + `Promise.race` returns a new promise which is settled in the same way as the + first passed promise to settle. + + Example: + + ```javascript + let promise1 = new Promise(function(resolve, reject){ + setTimeout(function(){ + resolve('promise 1'); + }, 200); + }); + + let promise2 = new Promise(function(resolve, reject){ + setTimeout(function(){ + resolve('promise 2'); + }, 100); + }); + + Promise.race([promise1, promise2]).then(function(result){ + // result === 'promise 2' because it was resolved before promise1 + // was resolved. + }); + ``` + + `Promise.race` is deterministic in that only the state of the first + settled promise matters. For example, even if other promises given to the + `promises` array argument are resolved, but the first settled promise has + become rejected before the other promises became fulfilled, the returned + promise will become rejected: + + ```javascript + let promise1 = new Promise(function(resolve, reject){ + setTimeout(function(){ + resolve('promise 1'); + }, 200); + }); + + let promise2 = new Promise(function(resolve, reject){ + setTimeout(function(){ + reject(new Error('promise 2')); + }, 100); + }); + + Promise.race([promise1, promise2]).then(function(result){ + // Code here never runs + }, function(reason){ + // reason.message === 'promise 2' because promise 2 became rejected before + // promise 1 became fulfilled + }); + ``` + + An example real-world use case is implementing timeouts: + + ```javascript + Promise.race([ajax('foo.json'), timeout(5000)]) + ``` + + @method race + @static + @param {Array} promises array of promises to observe + Useful for tooling. + @return {Promise} a promise which settles in the same way as the first passed + promise to settle. +*/ +function race(entries) { + /*jshint validthis:true */ + var Constructor = this; + + if (!isArray(entries)) { + return new Constructor(function (_, reject) { + return reject(new TypeError('You must pass an array to race.')); + }); + } else { + return new Constructor(function (resolve, reject) { + var length = entries.length; + for (var i = 0; i < length; i++) { + Constructor.resolve(entries[i]).then(resolve, reject); + } + }); + } +} + +/** + `Promise.reject` returns a promise rejected with the passed `reason`. + It is shorthand for the following: + + ```javascript + let promise = new Promise(function(resolve, reject){ + reject(new Error('WHOOPS')); + }); + + promise.then(function(value){ + // Code here doesn't run because the promise is rejected! + }, function(reason){ + // reason.message === 'WHOOPS' + }); + ``` + + Instead of writing the above, your code now simply becomes the following: + + ```javascript + let promise = Promise.reject(new Error('WHOOPS')); + + promise.then(function(value){ + // Code here doesn't run because the promise is rejected! + }, function(reason){ + // reason.message === 'WHOOPS' + }); + ``` + + @method reject + @static + @param {Any} reason value that the returned promise will be rejected with. + Useful for tooling. + @return {Promise} a promise rejected with the given `reason`. +*/ +function reject$1(reason) { + /*jshint validthis:true */ + var Constructor = this; + var promise = new Constructor(noop); + reject(promise, reason); + return promise; +} + +function needsResolver() { + throw new TypeError('You must pass a resolver function as the first argument to the promise constructor'); +} + +function needsNew() { + throw new TypeError("Failed to construct 'Promise': Please use the 'new' operator, this object constructor cannot be called as a function."); +} + +/** + Promise objects represent the eventual result of an asynchronous operation. The + primary way of interacting with a promise is through its `then` method, which + registers callbacks to receive either a promise's eventual value or the reason + why the promise cannot be fulfilled. + + Terminology + ----------- + + - `promise` is an object or function with a `then` method whose behavior conforms to this specification. + - `thenable` is an object or function that defines a `then` method. + - `value` is any legal JavaScript value (including undefined, a thenable, or a promise). + - `exception` is a value that is thrown using the throw statement. + - `reason` is a value that indicates why a promise was rejected. + - `settled` the final resting state of a promise, fulfilled or rejected. + + A promise can be in one of three states: pending, fulfilled, or rejected. + + Promises that are fulfilled have a fulfillment value and are in the fulfilled + state. Promises that are rejected have a rejection reason and are in the + rejected state. A fulfillment value is never a thenable. + + Promises can also be said to *resolve* a value. If this value is also a + promise, then the original promise's settled state will match the value's + settled state. So a promise that *resolves* a promise that rejects will + itself reject, and a promise that *resolves* a promise that fulfills will + itself fulfill. + + + Basic Usage: + ------------ + + ```js + let promise = new Promise(function(resolve, reject) { + // on success + resolve(value); + + // on failure + reject(reason); + }); + + promise.then(function(value) { + // on fulfillment + }, function(reason) { + // on rejection + }); + ``` + + Advanced Usage: + --------------- + + Promises shine when abstracting away asynchronous interactions such as + `XMLHttpRequest`s. + + ```js + function getJSON(url) { + return new Promise(function(resolve, reject){ + let xhr = new XMLHttpRequest(); + + xhr.open('GET', url); + xhr.onreadystatechange = handler; + xhr.responseType = 'json'; + xhr.setRequestHeader('Accept', 'application/json'); + xhr.send(); + + function handler() { + if (this.readyState === this.DONE) { + if (this.status === 200) { + resolve(this.response); + } else { + reject(new Error('getJSON: `' + url + '` failed with status: [' + this.status + ']')); + } + } + }; + }); + } + + getJSON('/posts.json').then(function(json) { + // on fulfillment + }, function(reason) { + // on rejection + }); + ``` + + Unlike callbacks, promises are great composable primitives. + + ```js + Promise.all([ + getJSON('/posts'), + getJSON('/comments') + ]).then(function(values){ + values[0] // => postsJSON + values[1] // => commentsJSON + + return values; + }); + ``` + + @class Promise + @param {Function} resolver + Useful for tooling. + @constructor +*/ + +var Promise$1 = function () { + function Promise(resolver) { + this[PROMISE_ID] = nextId(); + this._result = this._state = undefined; + this._subscribers = []; + + if (noop !== resolver) { + typeof resolver !== 'function' && needsResolver(); + this instanceof Promise ? initializePromise(this, resolver) : needsNew(); + } + } + + /** + The primary way of interacting with a promise is through its `then` method, + which registers callbacks to receive either a promise's eventual value or the + reason why the promise cannot be fulfilled. + ```js + findUser().then(function(user){ + // user is available + }, function(reason){ + // user is unavailable, and you are given the reason why + }); + ``` + Chaining + -------- + The return value of `then` is itself a promise. This second, 'downstream' + promise is resolved with the return value of the first promise's fulfillment + or rejection handler, or rejected if the handler throws an exception. + ```js + findUser().then(function (user) { + return user.name; + }, function (reason) { + return 'default name'; + }).then(function (userName) { + // If `findUser` fulfilled, `userName` will be the user's name, otherwise it + // will be `'default name'` + }); + findUser().then(function (user) { + throw new Error('Found user, but still unhappy'); + }, function (reason) { + throw new Error('`findUser` rejected and we're unhappy'); + }).then(function (value) { + // never reached + }, function (reason) { + // if `findUser` fulfilled, `reason` will be 'Found user, but still unhappy'. + // If `findUser` rejected, `reason` will be '`findUser` rejected and we're unhappy'. + }); + ``` + If the downstream promise does not specify a rejection handler, rejection reasons will be propagated further downstream. + ```js + findUser().then(function (user) { + throw new PedagogicalException('Upstream error'); + }).then(function (value) { + // never reached + }).then(function (value) { + // never reached + }, function (reason) { + // The `PedgagocialException` is propagated all the way down to here + }); + ``` + Assimilation + ------------ + Sometimes the value you want to propagate to a downstream promise can only be + retrieved asynchronously. This can be achieved by returning a promise in the + fulfillment or rejection handler. The downstream promise will then be pending + until the returned promise is settled. This is called *assimilation*. + ```js + findUser().then(function (user) { + return findCommentsByAuthor(user); + }).then(function (comments) { + // The user's comments are now available + }); + ``` + If the assimliated promise rejects, then the downstream promise will also reject. + ```js + findUser().then(function (user) { + return findCommentsByAuthor(user); + }).then(function (comments) { + // If `findCommentsByAuthor` fulfills, we'll have the value here + }, function (reason) { + // If `findCommentsByAuthor` rejects, we'll have the reason here + }); + ``` + Simple Example + -------------- + Synchronous Example + ```javascript + let result; + try { + result = findResult(); + // success + } catch(reason) { + // failure + } + ``` + Errback Example + ```js + findResult(function(result, err){ + if (err) { + // failure + } else { + // success + } + }); + ``` + Promise Example; + ```javascript + findResult().then(function(result){ + // success + }, function(reason){ + // failure + }); + ``` + Advanced Example + -------------- + Synchronous Example + ```javascript + let author, books; + try { + author = findAuthor(); + books = findBooksByAuthor(author); + // success + } catch(reason) { + // failure + } + ``` + Errback Example + ```js + function foundBooks(books) { + } + function failure(reason) { + } + findAuthor(function(author, err){ + if (err) { + failure(err); + // failure + } else { + try { + findBoooksByAuthor(author, function(books, err) { + if (err) { + failure(err); + } else { + try { + foundBooks(books); + } catch(reason) { + failure(reason); + } + } + }); + } catch(error) { + failure(err); + } + // success + } + }); + ``` + Promise Example; + ```javascript + findAuthor(). + then(findBooksByAuthor). + then(function(books){ + // found books + }).catch(function(reason){ + // something went wrong + }); + ``` + @method then + @param {Function} onFulfilled + @param {Function} onRejected + Useful for tooling. + @return {Promise} + */ + + /** + `catch` is simply sugar for `then(undefined, onRejection)` which makes it the same + as the catch block of a try/catch statement. + ```js + function findAuthor(){ + throw new Error('couldn't find that author'); + } + // synchronous + try { + findAuthor(); + } catch(reason) { + // something went wrong + } + // async with promises + findAuthor().catch(function(reason){ + // something went wrong + }); + ``` + @method catch + @param {Function} onRejection + Useful for tooling. + @return {Promise} + */ + + + Promise.prototype.catch = function _catch(onRejection) { + return this.then(null, onRejection); + }; + + /** + `finally` will be invoked regardless of the promise's fate just as native + try/catch/finally behaves + + Synchronous example: + + ```js + findAuthor() { + if (Math.random() > 0.5) { + throw new Error(); + } + return new Author(); + } + + try { + return findAuthor(); // succeed or fail + } catch(error) { + return findOtherAuther(); + } finally { + // always runs + // doesn't affect the return value + } + ``` + + Asynchronous example: + + ```js + findAuthor().catch(function(reason){ + return findOtherAuther(); + }).finally(function(){ + // author was either found, or not + }); + ``` + + @method finally + @param {Function} callback + @return {Promise} + */ + + + Promise.prototype.finally = function _finally(callback) { + var promise = this; + var constructor = promise.constructor; + + if (isFunction(callback)) { + return promise.then(function (value) { + return constructor.resolve(callback()).then(function () { + return value; + }); + }, function (reason) { + return constructor.resolve(callback()).then(function () { + throw reason; + }); + }); + } + + return promise.then(callback, callback); + }; + + return Promise; +}(); + +Promise$1.prototype.then = then; +Promise$1.all = all; +Promise$1.race = race; +Promise$1.resolve = resolve$1; +Promise$1.reject = reject$1; +Promise$1._setScheduler = setScheduler; +Promise$1._setAsap = setAsap; +Promise$1._asap = asap; +Promise$1.debug = function _debug() { + return '[object ES6Promise]' +}; + + +/*global self*/ +function polyfill() { + var local = globalThis; + + local.Promise = Promise$1; +} + +// Strange compat.. +Promise$1.polyfill = polyfill; +Promise$1.Promise = Promise$1; + +return Promise$1; + +}))); + + + +//# sourceMappingURL=es6-promise.map \ No newline at end of file diff --git a/shared/event.js b/shared/event.js new file mode 100644 index 0000000..e4a2441 --- /dev/null +++ b/shared/event.js @@ -0,0 +1,42 @@ +export class EventBus { + constructor() { + this.map = new Map() + } + + on(type, cb) { + if (this.map.has(type)) { + this.map.get(type).push(cb) + } else { + this.map.set(type, [cb]) + } + } + + off(type, cb) { + if (type) { + if (cb) { + const cbs = this.map.get(type) + + if (!cbs) return + const index = cbs.findIndex((i) => i === cb) + + if (index >= 0) { + cbs.splice(index, 1) + } + } else { + this.map.delete(type) + } + } else { + this.map.clear() + } + } + + emit(type, ...args) { + for (let cb of this.map.get(type) ? this.map.get(type) : []) { + cb && cb(...args) + } + } + + count(type) { + return this.map.get(type) ? this.map.get(type).length : 0 + } +} diff --git a/shared/message-side.js b/shared/message-side.js new file mode 100644 index 0000000..ae0d9c2 --- /dev/null +++ b/shared/message-side.js @@ -0,0 +1,1145 @@ +import { EventBus } from './event' +import { Deferred, timeout } from './defer' +import { json2buf, buf2json, bin2hex, str2buf, buf2str } from './data' + +let logger + +function initLogger() { + logger = Logger.getLogger('side-message') +} + +const DEBUG = true + +export const MESSAGE_SIZE = 3600 +export const MESSAGE_HEADER = 16 +export const MESSAGE_PAYLOAD = MESSAGE_SIZE - MESSAGE_HEADER +export const HM_MESSAGE_PROTO_HEADER = 66 +export const HM_MESSAGE_PROTO_PAYLOAD = MESSAGE_PAYLOAD - HM_MESSAGE_PROTO_HEADER + +export const MessageFlag = { + Runtime: 0x0, + App: 0x1 +} + +export const MessageType = { + Shake: 0x1, + Close: 0x2, + Heart: 0x3, + Data: 0x4, + DataWithSystemTool: 0x5, + Log: 0x6 +} + +export const MessageRuntimeType = { + Invoke: 0x1 +} +export const MessageVersion = { + Version1: 0x1 +} + +export const MessagePayloadType = { + Request: 0x1, + Response: 0x2, + Notify: 0x3 +} + +export const DataType = { + empty: 'empty', + json: 'json', + text: 'text', + bin: 'bin' +} + +export const MessagePayloadDataTypeOp = { + EMPTY: 0x0, + TEXT: 0x1, + JSON: 0x2, + BIN: 0x3 +} + +export function getDataType(type) { + switch (type.toLowerCase()) { + case DataType.json: + return MessagePayloadDataTypeOp.JSON + case DataType.text: + return MessagePayloadDataTypeOp.TEXT + case DataType.bin: + return MessagePayloadDataTypeOp.BIN + case DataType.empty: + return MessagePayloadDataTypeOp.EMPTY + default: + return MessagePayloadDataTypeOp.TEXT + } +} + +// 中续,结束 +export const MessagePayloadOpCode = { + Continued: 0x0, + Finished: 0x1 +} + +let traceId = 10000 +export function genTraceId() { + return traceId++ +} + +let spanId = 1000 +export function genSpanId() { + return spanId++ +} + +export function getTimestamp(t = Date.now()) { + return t % 10000000 +} + +class Session extends EventBus { + constructor(id, type, ctx) { + super() + this.id = id + this.type = type // payloadType + this.ctx = ctx + this.tempBuf = null + this.chunks = [] + this.count = 0 + this.finishChunk = null + } + + addChunk(payload) { + if (payload.opCode === MessagePayloadOpCode.Finished) { + this.count = payload.seqId + this.finishChunk = payload + } + + if (payload.payloadLength !== payload.payload.byteLength) { + logger.error('receive chunk data length error, expect %d but %d', payload.payloadLength, payload.payload.byteLength) + this.emit('error', Error(`receive chunk data length error, expect ${payload.payloadLength} but ${payload.payload.byteLength}`)) + return + } + + this.chunks.push(payload) + this.checkIfReceiveAllChunks() + } + + checkIfReceiveAllChunks() { + if (this.count !== this.chunks.length) return + + for (let i = 1; i <= this.count; i++) { + const chunk = this.chunks.find((c) => c.seqId === i) + + if (!chunk) { + this.releaseBuf() + this.emit('error', Error('receive data error')) + return + } + + const buf = chunk.payload + this.tempBuf = this.tempBuf ? Buffer.concat([this.tempBuf, buf]) : buf + } + + if (!this.finishChunk) return + + this.finishChunk.payload = this.tempBuf + this.finishChunk.payloadLength = this.finishChunk.payload.byteLength + + if (this.finishChunk.totalLength !== this.finishChunk.payloadLength) { + logger.error('receive full data length error, expect %d but %d', this.finishChunk.payloadLength, this.finishChunk.payload.byteLength) + this.emit('error', Error(`receive full data length error, expect ${this.finishChunk.payloadLength} but ${this.finishChunk.payload.byteLength}`)) + return + } + + this.emit('data', this.finishChunk) + } + + getLength() { + return this.tempBufLength + } + releaseBuf() { + this.tempBuf = null + this.chunks = [] + this.finishChunk = null + this.count = 0 + } +} + +class SessionMgr { + constructor() { + this.sessions = new Map() + } + + key(session) { + return `${session.id}:${session.type}` + } + + newSession(id, type, ctx) { + const newSession = new Session(id, type, ctx) + this.sessions.set(this.key(newSession), newSession) + return newSession + } + + destroy(session) { + session.releaseBuf() + this.sessions.delete(this.key(session)) + } + + has(id, type) { + return this.sessions.has( + this.key({ + id, + type + }) + ) + } + + getById(id, type) { + return this.sessions.get( + this.key({ + id, + type + }) + ) + } + + clear() { + this.sessions.clear() + } +} + +export class MessageBuilder extends EventBus { + constructor( + { appId = 0, appDevicePort = 20, appSidePort = 0, ble = undefined } = { + appId: 0, + appDevicePort: 20, + appSidePort: 0, + ble: undefined + } + ) { + super() + initLogger() + this.isDevice = typeof __ZEPPOS__ !== 'undefined' + this.isSide = !this.isDevice + + this.appId = appId + this.appDevicePort = appDevicePort + this.appSidePort = appSidePort + this.ble = ble + this.sendMsg = this.getSafeSend() + this.chunkSize = MESSAGE_PAYLOAD + this.tempBuf = null + this.shakeTask = Deferred() + this.waitingShakePromise = this.shakeTask.promise + this.sessionMgr = new SessionMgr() + } + + getMessageSize() { + return MESSAGE_SIZE + } + + getMessagePayloadSize() { + return MESSAGE_PAYLOAD + } + + getMessageHeaderSize() { + return MESSAGE_HEADER + } + + buf2Json(buf) { + return buf2json(buf) + } + + json2Buf(json) { + return json2buf(json) + } + + now(t = Date.now()) { + return getTimestamp(t) + } + + connect(cb) { + this.on('message', (message) => { + this.onMessage(message) + }) + + this.ble && + this.ble.createConnect((index, data, size) => { + DEBUG && logger.warn('[RAW] [R] receive index=>%d size=>%d bin=>%s', index, size, bin2hex(data)) + this.onFragmentData(data) + }) + + this.sendShake() + cb && cb(this) + } + + disConnect(cb) { + logger.debug('app ble disconnect') + this.sendClose() + this.off('message') + this.ble && this.ble.disConnect() + + cb && cb(this) + } + + listen(cb) { + if (typeof messaging === 'undefined') { + return + } + + messaging && + messaging.peerSocket.addListener('message', (message) => { + DEBUG && logger.warn('[RAW] [R] receive size=>%d bin=>%s', message.byteLength, bin2hex(message)) + this.onMessage(message) + }) + + this.waitingShakePromise = Promise.resolve() + cb && cb(this) + } + + buildBin(data) { + if (data.payload.byteLength > this.chunkSize) { + throw new Error(`${data.payload.byteLength} greater than max size of ${this.chunkSize}`) + } + + const size = this.getMessageHeaderSize() + data.payload.byteLength + let buf = Buffer.alloc(size) + let offset = 0 + + buf.writeUInt8(data.flag, offset) + offset += 1 + + buf.writeUInt8(data.version, offset) + offset += 1 + + buf.writeUInt16LE(data.type, offset) + offset += 2 + + buf.writeUInt16LE(data.port1, offset) + offset += 2 + + buf.writeUInt16LE(data.port2, offset) + offset += 2 + + buf.writeUInt32LE(data.appId, offset) + offset += 4 + + buf.writeUInt32LE(data.extra, offset) + offset += 4 + + buf.fill(data.payload, offset, data.payload.byteLength + offset) + + return buf + } + + buildShake() { + return this.buildBin({ + flag: MessageFlag.App, + version: MessageVersion.Version1, + type: MessageType.Shake, + port1: this.appDevicePort, + port2: this.appSidePort, + appId: this.appId, + extra: 0, + payload: Buffer.from([this.appId]) + }) + } + + sendShake() { + if (this.appSidePort === 0) { + const shake = this.buildShake() + this.sendMsg(shake) + } + } + + buildClose() { + return this.buildBin({ + flag: MessageFlag.App, + version: MessageVersion.Version1, + type: MessageType.Close, + port1: this.appDevicePort, + port2: this.appSidePort, + appId: this.appId, + extra: 0, + payload: Buffer.from([this.appId]) + }) + } + + sendClose() { + if (this.appSidePort !== 0) { + const close = this.buildClose() + + this.sendMsg(close) + } + } + + readBin(arrayBuf) { + const buf = Buffer.from(arrayBuf) + let offset = 0 + + const flag = buf.readUInt8(offset) + offset += 1 + + const version = buf.readUInt8(offset) + offset += 1 + + const type = buf.readUInt16LE(offset) + offset += 2 + + const port1 = buf.readUInt16LE(offset) + offset += 2 + + const port2 = buf.readUInt16LE(offset) + offset += 2 + + const appId = buf.readUInt32LE(offset) + offset += 4 + + const extra = buf.readUInt32LE(offset) + offset += 4 + + const payload = buf.subarray(offset) + + return { + flag, + version, + type, + port1, + port2, + appId, + extra, + payload + } + } + + // opts 覆盖头部选项 + buildData(payload, opts = {}) { + return this.buildBin({ + flag: MessageFlag.App, + version: MessageVersion.Version1, + type: MessageType.Data, + port1: this.appDevicePort, + port2: this.appSidePort, + appId: this.appId, + extra: 0, + ...opts, + payload + }) + } + + sendBin(buf, debug = DEBUG) { + // ble 发送消息 + debug && logger.warn('[RAW] [S] send size=%d bin=%s', buf.byteLength, bin2hex(buf.buffer)) + const result = this.ble.send(buf.buffer, buf.byteLength) + + if (!result) { + throw Error('send message error') + } + } + + sendBinBySide(buf, debug = DEBUG) { + // side 发送消息 + debug && logger.warn('[RAW] [S] send size=%d bin=%s', buf.byteLength, bin2hex(buf.buffer)) + messaging && messaging.peerSocket.send(buf.buffer) + } + + // 通用获取逻辑 + getSafeSend() { + if (this.isDevice) { + return this.sendBin.bind(this) + } else { + return this.sendBinBySide.bind(this) + } + } + + _logSend(buf) { + this.sendMsg(buf, false) + } + + // 大数据的复杂头部分包协议 + sendHmProtocol({ requestId, dataBin, type, contentType, dataType }, { messageType = MessageType.Data } = {}) { + const headerSize = 0 + const hmDataSize = HM_MESSAGE_PROTO_PAYLOAD + const userDataLength = dataBin.byteLength + + let offset = 0 + const _buf = Buffer.alloc(hmDataSize) + const traceId = requestId ? requestId : genTraceId() + const spanId = genSpanId() + let seqId = 1 + + const count = Math.ceil(userDataLength / hmDataSize) + + function genSeqId() { + return seqId++ + } + + for (let i = 1; i <= count; i++) { + this.errorIfBleDisconnect() + if (i === count) { + // last + const tailSize = userDataLength - offset + const tailBuf = Buffer.alloc(headerSize + tailSize) + + dataBin.copy(tailBuf, headerSize, offset, offset + tailSize) + offset += tailSize + this.sendDataWithSession( + { + traceId, + spanId: spanId, + seqId: genSeqId(), + payload: tailBuf, + type, + opCode: MessagePayloadOpCode.Finished, + totalLength: userDataLength, + contentType, + dataType + }, + { + messageType + } + ) + + break + } + + dataBin.copy(_buf, headerSize, offset, offset + hmDataSize) + offset += hmDataSize + + this.sendDataWithSession( + { + traceId, + spanId: spanId, + seqId: genSeqId(), + payload: _buf, + type, + opCode: MessagePayloadOpCode.Continued, + totalLength: userDataLength, + contentType, + dataType + }, + { + messageType + } + ) + } + + if (offset === userDataLength) { + DEBUG && logger.debug('HmProtocol send ok msgSize=> %d dataSize=> %d', offset, userDataLength) + } else { + DEBUG && logger.error('HmProtocol send error msgSize=> %d dataSize=> %d', offset, userDataLength) + } + } + + // 大数据的简单分包协议 + sendSimpleProtocol({ dataBin }, { messageType = MessageType.Data } = {}) { + const dataSize = this.chunkSize + const headerSize = 0 + const userDataLength = dataBin.byteLength + + let offset = 0 + const _buf = Buffer.alloc(dataSize) + + const count = Math.ceil(userDataLength / dataSize) + + for (let i = 1; i <= count; i++) { + if (i === count) { + // last + const tailSize = userDataLength - offset + const tailBuf = Buffer.alloc(headerSize + tailSize) + + dataBin.copy(tailBuf, headerSize, offset, offset + tailSize) + offset += tailSize + this.sendSimpleData( + { + payload: tailBuf + }, + { + messageType + } + ) + + break + } + + dataBin.copy(_buf, headerSize, offset, offset + dataSize) + offset += dataSize + + this.sendSimpleData( + { + payload: _buf + }, + { + messageType + } + ) + } + + if (offset === userDataLength) { + // logger.debug('SimpleProtocol send ok msgSize=> %d dataSize=> %d', offset, userDataLength) + } else { + // logger.error('SimpleProtocol send error msgSize=> %d dataSize=> %d', offset, userDataLength) + } + } + + sendJson({ requestId = 0, json, type = MessagePayloadType.Request, contentType, dataType }) { + const packageBin = json2buf(json) + const traceId = requestId ? requestId : genTraceId() + + this.sendHmProtocol({ + requestId: traceId, + dataBin: packageBin, + type, + contentType, + dataType + }) + } + + sendBuf({ requestId = 0, buf, type = MessagePayloadType.Request, contentType, dataType }) { + const traceId = requestId ? requestId : genTraceId() + + return this.sendHmProtocol({ + requestId: traceId, + dataBin: buf, + type, + contentType, + dataType + }) + } + + sendLog(str) { + const packageBuf = str2buf(str) + this.sendSimpleProtocol( + { + dataBin: packageBuf + }, + { + messageType: MessageType.Log + } + ) + } + + sendDataWithSession({ traceId, spanId, seqId, payload, type, opCode, totalLength, contentType, dataType }, { messageType }) { + const payloadBin = this.buildPayload({ + traceId, + spanId, + seqId, + totalLength, + type, + opCode, + payload, + contentType, + dataType + }) + + let data = this.isDevice + ? this.buildData(payloadBin, { + type: messageType + }) + : payloadBin + + this.sendMsg(data) + } + + sendSimpleData({ payload }, { messageType }) { + let data = this.isDevice + ? this.buildData(payload, { + type: messageType + }) + : payload + + this._logSend(data) + } + + buildPayload(data) { + const size = HM_MESSAGE_PROTO_HEADER + data.payload.byteLength + let buf = Buffer.alloc(size) + let offset = 0 + + // header + // traceId + buf.writeUInt32LE(data.traceId, offset) + offset += 4 + + // parentId + buf.writeUInt32LE(0, offset) + offset += 4 + + // spanId + buf.writeUInt32LE(data.spanId, offset) + offset += 4 + + // seqId // 顺序 id,消息部分顺序序列号 + buf.writeUInt32LE(data.seqId, offset) + offset += 4 + + // message total length + buf.writeUInt32LE(data.totalLength, offset) + offset += 4 + + // payload length 当前 + buf.writeUInt32LE(data.payload.byteLength, offset) + offset += 4 + + // payload type + buf.writeUInt8(data.type, offset) + offset += 1 + + // opCode + buf.writeUInt8(data.opCode, offset) + offset += 1 + + // timestamp1 + buf.writeUInt32LE(this.now(), offset) + offset += 4 + + // timestamp2 + buf.writeUInt32LE(0, offset) + offset += 4 + + // timestamp3 + buf.writeUInt32LE(0, offset) + offset += 4 + + // timestamp4 + buf.writeUInt32LE(0, offset) + offset += 4 + + // timestamp5 + buf.writeUInt32LE(0, offset) + offset += 4 + + // timestamp6 + buf.writeUInt32LE(0, offset) + offset += 4 + + // timestamp7 + buf.writeUInt32LE(0, offset) + offset += 4 + + // request content data type + buf.writeUInt8(data.contentType, offset) + offset += 1 + + // response data type + buf.writeUInt8(data.dataType, offset) + offset += 1 + + buf.writeUInt16LE(0, offset) + offset += 2 + + // extra1 + buf.writeUInt32LE(0, offset) + offset += 4 + + // extra2 + buf.writeUInt32LE(0, offset) + offset += 4 + + // payload + buf.fill(data.payload, offset, data.payload.byteLength + offset) + + return buf + } + + readPayload(arrayBuf) { + const buf = Buffer.from(arrayBuf) + let offset = 0 + + const traceId = buf.readUInt32LE(offset) + offset += 4 + + const parentId = buf.readUInt32LE(offset) + offset += 4 + + const spanId = buf.readUInt32LE(offset) + offset += 4 + + const seqId = buf.readUInt32LE(offset) + offset += 4 + + const totalLength = buf.readUInt32LE(offset) + offset += 4 + + const payloadLength = buf.readUInt32LE(offset) + offset += 4 + + const payloadType = buf.readUInt8(offset) + offset += 1 + + const opCode = buf.readUInt8(offset) + offset += 1 + + const timestamp1 = buf.readUInt32LE(offset) + offset += 4 + + const timestamp2 = buf.readUInt32LE(offset) + offset += 4 + + const timestamp3 = buf.readUInt32LE(offset) + offset += 4 + + const timestamp4 = buf.readUInt32LE(offset) + offset += 4 + + const timestamp5 = buf.readUInt32LE(offset) + offset += 4 + + const timestamp6 = buf.readUInt32LE(offset) + offset += 4 + + const timestamp7 = buf.readUInt32LE(offset) + offset += 4 + + // request data type + const contentType = buf.readUInt8(offset) + offset += 1 + + // response data type + const dataType = buf.readUInt8(offset) + offset += 1 + + const extra1 = buf.readUInt16LE(offset) + offset += 2 + + const extra2 = buf.readUInt32LE(offset) + offset += 4 + + const extra3 = buf.readUInt32LE(offset) + offset += 4 + + const payload = buf.subarray(offset) + + return { + traceId, + parentId, + spanId, + seqId, + totalLength, + payloadLength, + payloadType, + opCode, + contentType, + dataType, + timestamp1, + timestamp2, + timestamp3, + timestamp4, + timestamp5, + timestamp6, + timestamp7, + extra1, + extra2, + extra3, + payload + } + } + + onFragmentData(bin) { + const data = this.readBin(bin) + this.emit('raw', bin) + + DEBUG && logger.debug('receive data=>', JSON.stringify(data)) + if (data.flag === MessageFlag.App && data.type === MessageType.Shake) { + this.appSidePort = data.port2 + logger.debug('appSidePort=>', data.port2) + this.shakeTask.resolve() + } else if (data.flag === MessageFlag.App && data.type === MessageType.Data && data.port2 === this.appSidePort) { + this.emit('message', data.payload) + this.emit('read', data) + } else if (data.flag === MessageFlag.App && data.type === MessageType.DataWithSystemTool && data.port2 === this.appSidePort) { + this.emit('message', data.payload) + this.emit('read', data) + } else if (data.flag === MessageFlag.App && data.type === MessageType.Log && data.port2 === this.appSidePort) { + this.emit('log', data.payload) + } else { + logger.error('error appSidePort=>%d data=>%j', this.appSidePort, data) + } + } + + errorIfBleDisconnect() { } + + onMessage(messagePayload) { + const payload = this.readPayload(messagePayload) + let session = this.sessionMgr.getById(payload.traceId, payload.payloadType) + + if (!session) { + session = this.sessionMgr.newSession(payload.traceId, payload.payloadType, this) + + // TODO: 需要考虑缓冲,监听回调要放到启动之前,或者没有增加监听就缓存请求 + session.on('data', (fullPayload) => { + if (fullPayload.opCode === MessagePayloadOpCode.Finished) { + if (fullPayload.payloadType === MessagePayloadType.Request) { + this.emit('request', { + request: fullPayload, + response: ({ data }) => { + this.response({ + requestId: fullPayload.traceId, + contentType: fullPayload.contentType, + dataType: fullPayload.dataType, + data + }) + } + }) + } else if (fullPayload.payloadType === MessagePayloadType.Response) { + this.emit('response', fullPayload) + } else if (fullPayload.payloadType === MessagePayloadType.Notify) { + this.emit('call', fullPayload) + } + + this.emit('data', fullPayload) + this.sessionMgr.destroy(session) + } + }) + + session.on('error', (error) => { + this.sessionMgr.destroy(session) + this.emit('error', error) + }) + } + + session.addChunk(payload) + } + + /** + * 发送请求 + * @param {object buffer arraybuffer arraybuffer like} data 传输的数据 + * @param {*} opts + * @returns + */ + request(data, opts) { + const _request = () => { + const defaultOpts = { + timeout: 60000, + contentType: 'json', + dataType: 'json' + } + const requestId = genTraceId() + const defer = Deferred() + opts = Object.assign(defaultOpts, opts) + + const error = (error) => { + this.off('error', error) + defer.reject(error) + } + + const transact = ({ traceId, payload, dataType }) => { + this.errorIfBleDisconnect() + DEBUG && logger.debug('traceId=>%d payload=>%s', traceId, payload.toString('hex')) + if (traceId === requestId) { + let result + switch (dataType) { + case MessagePayloadDataTypeOp.TEXT: + result = buf2str(payload) + break + case MessagePayloadDataTypeOp.BIN: + result = payload + break + case MessagePayloadDataTypeOp.JSON: + result = buf2json(payload) + break + default: // text + result = buf2str(payload) + break + } + DEBUG && logger.debug('request id=>%d payload=>%j', requestId, data) + DEBUG && logger.debug('response id=>%d payload=>%j', requestId, result) + + this.off('response', transact) + this.off('error', error) + defer.resolve(result) + } + } + + this.on('response', transact) + this.on('error', error) + if (Buffer.isBuffer(data)) { + this.sendBuf({ + requestId, + buf: data, + type: MessagePayloadType.Request, + contentType: MessagePayloadDataTypeOp.BIN, + dataType: getDataType(opts.dataType) + }) + } else if (data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { + this.sendBuf({ + requestId, + buf: Buffer.from(data), + type: MessagePayloadType.Request, + contentType: MessagePayloadDataTypeOp.BIN, + dataType: getDataType(opts.dataType) + }) + } else { + this.sendJson({ + requestId, + json: data, + type: MessagePayloadType.Request, + contentType: MessagePayloadDataTypeOp.JSON, + dataType: getDataType(opts.dataType) + }) + } + + let hasReturned = false + + return Promise.race([ + timeout(opts.timeout, (resolve, reject) => { + if (hasReturned) { + return resolve() + } + + DEBUG && logger.error(`request timeout in ${opts.timeout}ms error=> %d data=> %j`, requestId, data) + this.off('response', transact) + + reject(Error(`Timed out in ${opts.timeout}ms.`)) + }), + defer.promise.finally(() => { + hasReturned = true + }) + ]) + } + + return this.waitingShakePromise.then(_request) + } + + requestCb(data, opts, cb) { + const _requestCb = () => { + const defaultOpts = { + timeout: 60000, + contentType: 'json', + dataType: 'json' + } + + if (typeof opts === 'function') { + cb = opts + opts = defaultOpts + } else { + opts = Object.assign(defaultOpts, opts) + } + + const requestId = genTraceId() + let timer1 = null + let hasReturned = false + + const transact = ({ traceId, payload, dataType }) => { + DEBUG && logger.debug('traceId=>%d payload=>%s', traceId, payload.toString('hex')) + if (traceId === requestId) { + let result + switch (dataType) { + case MessagePayloadDataTypeOp.TEXT: + result = buf2str(payload) + break + case MessagePayloadDataTypeOp.BIN: + result = payload + break + case MessagePayloadDataTypeOp.JSON: + result = buf2json(payload) + break + default: // text + result = buf2str(payload) + break + } + DEBUG && logger.debug('request id=>%d payload=>%j', requestId, data) + DEBUG && logger.debug('response id=>%d payload=>%j', requestId, result) + + timer1 && clearTimeout(timer1) + timer1 = null + this.off('response', transact) + hasReturned = true + cb(null, result) + } + } + + this.on('response', transact) + if (Buffer.isBuffer(data)) { + this.sendBuf({ + requestId, + buf: data, + type: MessagePayloadType.Request, + contentType: MessagePayloadDataTypeOp.BIN, + dataType: getDataType(opts.dataType) + }) + } else if (data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { + this.sendBuf({ + requestId, + buf: Buffer.from(data), + type: MessagePayloadType.Request, + contentType: MessagePayloadDataTypeOp.BIN, + dataType: getDataType(opts.dataType) + }) + } else { + this.sendJson({ + requestId, + json: data, + type: MessagePayloadType.Request, + contentType: MessagePayloadDataTypeOp.JSON, + dataType: getDataType(opts.dataType) + }) + } + + timer1 = setTimeout(() => { + timer1 = null + if (hasReturned) { + return + } + + DEBUG && logger.error(`request time out in ${opts.timeout}ms error=>%d data=>%j`, requestId, data) + this.off('response', transact) + cb(Error(`Timed out in ${opts.timeout}ms.`)) + }, opts.timeout) + } + + return this.waitingShakePromise.then(_requestCb) + } + + /** + * 相应接口给当前请求 + * @param {obj} param0 + */ + response({ requestId, contentType, dataType, data }) { + if (MessagePayloadDataTypeOp.BIN === dataType) { + this.sendBuf({ + requestId, + buf: data, + type: MessagePayloadType.Response, + contentType, + dataType + }) + } else { + this.sendJson({ + requestId, + json: data, + type: MessagePayloadType.Response, + contentType, + dataType + }) + } + } + + /** + * call 模式调用接口到伴生服务 + * @param {json | buffer} data + * @returns + */ + call(data) { + return this.waitingShakePromise.then(() => { + if (Buffer.isBuffer(data)) { + return this.sendBuf({ + buf: data, + type: MessagePayloadType.Notify, + contentType: MessagePayloadDataTypeOp.BIN, + dataType: MessagePayloadDataTypeOp.EMPTY + }) + } else if (data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { + return this.sendBuf({ + buf: Buffer.from(data), + type: MessagePayloadType.Notify, + contentType: MessagePayloadDataTypeOp.BIN, + dataType: MessagePayloadDataTypeOp.EMPTY + }) + } else { + return this.sendJson({ + json: data, + type: MessagePayloadType.Notify, + contentType: MessagePayloadDataTypeOp.JSON, + dataType: MessagePayloadDataTypeOp.EMPTY + }) + } + }) + } +} diff --git a/shared/message.js b/shared/message.js new file mode 100644 index 0000000..4dcc8fe --- /dev/null +++ b/shared/message.js @@ -0,0 +1,1151 @@ +import { EventBus } from '@zos/utils' +import { log as Logger } from '@zos/utils' +import { Deferred, timeout } from './defer' +import { json2buf, buf2json, bin2hex, str2buf, buf2str } from './data' + +let logger + +function initLogger() { + if (typeof __ZEPPOS__ !== 'undefined') { + logger = Logger.getLogger('device-message') + // logger.level = logger.levels.warn + } else { + logger = Logger.getLogger('side-message') + } +} + +const DEBUG = true + +export const MESSAGE_SIZE = 3600 +export const MESSAGE_HEADER = 16 +export const MESSAGE_PAYLOAD = MESSAGE_SIZE - MESSAGE_HEADER +export const HM_MESSAGE_PROTO_HEADER = 66 +export const HM_MESSAGE_PROTO_PAYLOAD = MESSAGE_PAYLOAD - HM_MESSAGE_PROTO_HEADER + +export const MessageFlag = { + Runtime: 0x0, + App: 0x1 +} + +export const MessageType = { + Shake: 0x1, + Close: 0x2, + Heart: 0x3, + Data: 0x4, + DataWithSystemTool: 0x5, + Log: 0x6 +} + +export const MessageRuntimeType = { + Invoke: 0x1 +} +export const MessageVersion = { + Version1: 0x1 +} + +export const MessagePayloadType = { + Request: 0x1, + Response: 0x2, + Notify: 0x3 +} + +export const DataType = { + empty: 'empty', + json: 'json', + text: 'text', + bin: 'bin' +} + +export const MessagePayloadDataTypeOp = { + EMPTY: 0x0, + TEXT: 0x1, + JSON: 0x2, + BIN: 0x3 +} + +export function getDataType(type) { + switch (type.toLowerCase()) { + case DataType.json: + return MessagePayloadDataTypeOp.JSON + case DataType.text: + return MessagePayloadDataTypeOp.TEXT + case DataType.bin: + return MessagePayloadDataTypeOp.BIN + case DataType.empty: + return MessagePayloadDataTypeOp.EMPTY + default: + return MessagePayloadDataTypeOp.TEXT + } +} + +// 中续,结束 +export const MessagePayloadOpCode = { + Continued: 0x0, + Finished: 0x1 +} + +let traceId = 10000 +export function genTraceId() { + return traceId++ +} + +let spanId = 1000 +export function genSpanId() { + return spanId++ +} + +export function getTimestamp(t = Date.now()) { + return t % 10000000 +} + +class Session extends EventBus { + constructor(id, type, ctx) { + super() + this.id = id + this.type = type // payloadType + this.ctx = ctx + this.tempBuf = null + this.chunks = [] + this.count = 0 + this.finishChunk = null + } + + addChunk(payload) { + if (payload.opCode === MessagePayloadOpCode.Finished) { + this.count = payload.seqId + this.finishChunk = payload + } + + if (payload.payloadLength !== payload.payload.byteLength) { + logger.error('receive chunk data length error, expect %d but %d', payload.payloadLength, payload.payload.byteLength) + this.emit('error', Error(`receive chunk data length error, expect ${payload.payloadLength} but ${payload.payload.byteLength}`)) + return + } + + this.chunks.push(payload) + this.checkIfReceiveAllChunks() + } + + checkIfReceiveAllChunks() { + if (this.count !== this.chunks.length) return + + for (let i = 1; i <= this.count; i++) { + const chunk = this.chunks.find((c) => c.seqId === i) + + if (!chunk) { + this.releaseBuf() + this.emit('error', Error('receive data error')) + return + } + + const buf = chunk.payload + this.tempBuf = this.tempBuf ? Buffer.concat([this.tempBuf, buf]) : buf + } + + if (!this.finishChunk) return + + this.finishChunk.payload = this.tempBuf + this.finishChunk.payloadLength = this.finishChunk.payload.byteLength + + if (this.finishChunk.totalLength !== this.finishChunk.payloadLength) { + logger.error('receive full data length error, expect %d but %d', this.finishChunk.payloadLength, this.finishChunk.payload.byteLength) + this.emit('error', Error(`receive full data length error, expect ${this.finishChunk.payloadLength} but ${this.finishChunk.payload.byteLength}`)) + return + } + + this.emit('data', this.finishChunk) + } + + getLength() { + return this.tempBufLength + } + releaseBuf() { + this.tempBuf = null + this.chunks = [] + this.finishChunk = null + this.count = 0 + } +} + +class SessionMgr { + constructor() { + this.sessions = new Map() + } + + key(session) { + return `${session.id}:${session.type}` + } + + newSession(id, type, ctx) { + const newSession = new Session(id, type, ctx) + this.sessions.set(this.key(newSession), newSession) + return newSession + } + + destroy(session) { + session.releaseBuf() + this.sessions.delete(this.key(session)) + } + + has(id, type) { + return this.sessions.has( + this.key({ + id, + type + }) + ) + } + + getById(id, type) { + return this.sessions.get( + this.key({ + id, + type + }) + ) + } + + clear() { + this.sessions.clear() + } +} + +export class MessageBuilder extends EventBus { + constructor( + { appId = 0, appDevicePort = 20, appSidePort = 0, ble = undefined } = { + appId: 0, + appDevicePort: 20, + appSidePort: 0, + ble: undefined + } + ) { + super() + initLogger() + this.isDevice = typeof __ZEPPOS__ !== 'undefined' + this.isSide = !this.isDevice + + this.appId = appId + this.appDevicePort = appDevicePort + this.appSidePort = appSidePort + this.ble = ble + this.sendMsg = this.getSafeSend() + this.chunkSize = MESSAGE_PAYLOAD + this.tempBuf = null + this.shakeTask = Deferred() + this.waitingShakePromise = this.shakeTask.promise + this.sessionMgr = new SessionMgr() + } + + getMessageSize() { + return MESSAGE_SIZE + } + + getMessagePayloadSize() { + return MESSAGE_PAYLOAD + } + + getMessageHeaderSize() { + return MESSAGE_HEADER + } + + buf2Json(buf) { + return buf2json(buf) + } + + json2Buf(json) { + return json2buf(json) + } + + now(t = Date.now()) { + return getTimestamp(t) + } + + connect(cb) { + this.on('message', (message) => { + this.onMessage(message) + }) + + this.ble && + this.ble.createConnect((index, data, size) => { + DEBUG && logger.warn('[RAW] [R] receive index=>%d size=>%d bin=>%s', index, size, bin2hex(data)) + this.onFragmentData(data) + }) + + this.sendShake() + cb && cb(this) + } + + disConnect(cb) { + logger.debug('app ble disconnect') + this.sendClose() + this.off('message') + this.ble && this.ble.disConnect() + + cb && cb(this) + } + + listen(cb) { + if (typeof messaging === 'undefined') { + return + } + + messaging && + messaging.peerSocket.addListener('message', (message) => { + DEBUG && logger.warn('[RAW] [R] receive size=>%d bin=>%s', message.byteLength, bin2hex(message)) + this.onMessage(message) + }) + + this.waitingShakePromise = Promise.resolve() + cb && cb(this) + } + + buildBin(data) { + if (data.payload.byteLength > this.chunkSize) { + throw new Error(`${data.payload.byteLength} greater than max size of ${this.chunkSize}`) + } + + const size = this.getMessageHeaderSize() + data.payload.byteLength + let buf = Buffer.alloc(size) + let offset = 0 + + buf.writeUInt8(data.flag, offset) + offset += 1 + + buf.writeUInt8(data.version, offset) + offset += 1 + + buf.writeUInt16LE(data.type, offset) + offset += 2 + + buf.writeUInt16LE(data.port1, offset) + offset += 2 + + buf.writeUInt16LE(data.port2, offset) + offset += 2 + + buf.writeUInt32LE(data.appId, offset) + offset += 4 + + buf.writeUInt32LE(data.extra, offset) + offset += 4 + + buf.fill(data.payload, offset, data.payload.byteLength + offset) + + return buf + } + + buildShake() { + return this.buildBin({ + flag: MessageFlag.App, + version: MessageVersion.Version1, + type: MessageType.Shake, + port1: this.appDevicePort, + port2: this.appSidePort, + appId: this.appId, + extra: 0, + payload: Buffer.from([this.appId]) + }) + } + + sendShake() { + if (this.appSidePort === 0) { + const shake = this.buildShake() + this.sendMsg(shake) + } + } + + buildClose() { + return this.buildBin({ + flag: MessageFlag.App, + version: MessageVersion.Version1, + type: MessageType.Close, + port1: this.appDevicePort, + port2: this.appSidePort, + appId: this.appId, + extra: 0, + payload: Buffer.from([this.appId]) + }) + } + + sendClose() { + if (this.appSidePort !== 0) { + const close = this.buildClose() + + this.sendMsg(close) + } + } + + readBin(arrayBuf) { + const buf = Buffer.from(arrayBuf) + let offset = 0 + + const flag = buf.readUInt8(offset) + offset += 1 + + const version = buf.readUInt8(offset) + offset += 1 + + const type = buf.readUInt16LE(offset) + offset += 2 + + const port1 = buf.readUInt16LE(offset) + offset += 2 + + const port2 = buf.readUInt16LE(offset) + offset += 2 + + const appId = buf.readUInt32LE(offset) + offset += 4 + + const extra = buf.readUInt32LE(offset) + offset += 4 + + const payload = buf.subarray(offset) + + return { + flag, + version, + type, + port1, + port2, + appId, + extra, + payload + } + } + + // opts 覆盖头部选项 + buildData(payload, opts = {}) { + return this.buildBin({ + flag: MessageFlag.App, + version: MessageVersion.Version1, + type: MessageType.Data, + port1: this.appDevicePort, + port2: this.appSidePort, + appId: this.appId, + extra: 0, + ...opts, + payload + }) + } + + sendBin(buf, debug = DEBUG) { + // ble 发送消息 + debug && logger.warn('[RAW] [S] send size=%d bin=%s', buf.byteLength, bin2hex(buf.buffer)) + const result = this.ble.send(buf.buffer, buf.byteLength) + + if (!result) { + throw Error('send message error') + } + } + + sendBinBySide(buf, debug = DEBUG) { + // side 发送消息 + debug && logger.warn('[RAW] [S] send size=%d bin=%s', buf.byteLength, bin2hex(buf.buffer)) + messaging.peerSocket.send(buf.buffer) + } + + // 通用获取逻辑 + getSafeSend() { + if (this.isDevice) { + return this.sendBin.bind(this) + } else { + return this.sendBinBySide.bind(this) + } + } + + _logSend(buf) { + this.sendMsg(buf, false) + } + + // 大数据的复杂头部分包协议 + sendHmProtocol({ requestId, dataBin, type, contentType, dataType }, { messageType = MessageType.Data } = {}) { + const headerSize = 0 + const hmDataSize = HM_MESSAGE_PROTO_PAYLOAD + const userDataLength = dataBin.byteLength + + let offset = 0 + const _buf = Buffer.alloc(hmDataSize) + const traceId = requestId ? requestId : genTraceId() + const spanId = genSpanId() + let seqId = 1 + + const count = Math.ceil(userDataLength / hmDataSize) + + function genSeqId() { + return seqId++ + } + + for (let i = 1; i <= count; i++) { + this.errorIfBleDisconnect() + if (i === count) { + // last + const tailSize = userDataLength - offset + const tailBuf = Buffer.alloc(headerSize + tailSize) + + dataBin.copy(tailBuf, headerSize, offset, offset + tailSize) + offset += tailSize + this.sendDataWithSession( + { + traceId, + spanId: spanId, + seqId: genSeqId(), + payload: tailBuf, + type, + opCode: MessagePayloadOpCode.Finished, + totalLength: userDataLength, + contentType, + dataType + }, + { + messageType + } + ) + + break + } + + dataBin.copy(_buf, headerSize, offset, offset + hmDataSize) + offset += hmDataSize + + this.sendDataWithSession( + { + traceId, + spanId: spanId, + seqId: genSeqId(), + payload: _buf, + type, + opCode: MessagePayloadOpCode.Continued, + totalLength: userDataLength, + contentType, + dataType + }, + { + messageType + } + ) + } + + if (offset === userDataLength) { + DEBUG && logger.debug('HmProtocol send ok msgSize=> %d dataSize=> %d', offset, userDataLength) + } else { + DEBUG && logger.error('HmProtocol send error msgSize=> %d dataSize=> %d', offset, userDataLength) + } + } + + // 大数据的简单分包协议 + sendSimpleProtocol({ dataBin }, { messageType = MessageType.Data } = {}) { + const dataSize = this.chunkSize + const headerSize = 0 + const userDataLength = dataBin.byteLength + + let offset = 0 + const _buf = Buffer.alloc(dataSize) + + const count = Math.ceil(userDataLength / dataSize) + + for (let i = 1; i <= count; i++) { + if (i === count) { + // last + const tailSize = userDataLength - offset + const tailBuf = Buffer.alloc(headerSize + tailSize) + + dataBin.copy(tailBuf, headerSize, offset, offset + tailSize) + offset += tailSize + this.sendSimpleData( + { + payload: tailBuf + }, + { + messageType + } + ) + + break + } + + dataBin.copy(_buf, headerSize, offset, offset + dataSize) + offset += dataSize + + this.sendSimpleData( + { + payload: _buf + }, + { + messageType + } + ) + } + + if (offset === userDataLength) { + // logger.debug('SimpleProtocol send ok msgSize=> %d dataSize=> %d', offset, userDataLength) + } else { + // logger.error('SimpleProtocol send error msgSize=> %d dataSize=> %d', offset, userDataLength) + } + } + + sendJson({ requestId = 0, json, type = MessagePayloadType.Request, contentType, dataType }) { + const packageBin = json2buf(json) + const traceId = requestId ? requestId : genTraceId() + + this.sendHmProtocol({ + requestId: traceId, + dataBin: packageBin, + type, + contentType, + dataType + }) + } + + sendBuf({ requestId = 0, buf, type = MessagePayloadType.Request, contentType, dataType }) { + const traceId = requestId ? requestId : genTraceId() + + return this.sendHmProtocol({ + requestId: traceId, + dataBin: buf, + type, + contentType, + dataType + }) + } + + sendLog(str) { + const packageBuf = str2buf(str) + this.sendSimpleProtocol( + { + dataBin: packageBuf + }, + { + messageType: MessageType.Log + } + ) + } + + sendDataWithSession({ traceId, spanId, seqId, payload, type, opCode, totalLength, contentType, dataType }, { messageType }) { + const payloadBin = this.buildPayload({ + traceId, + spanId, + seqId, + totalLength, + type, + opCode, + payload, + contentType, + dataType + }) + + let data = this.isDevice + ? this.buildData(payloadBin, { + type: messageType + }) + : payloadBin + + this.sendMsg(data) + } + + sendSimpleData({ payload }, { messageType }) { + let data = this.isDevice + ? this.buildData(payload, { + type: messageType + }) + : payload + + this._logSend(data) + } + + buildPayload(data) { + const size = HM_MESSAGE_PROTO_HEADER + data.payload.byteLength + let buf = Buffer.alloc(size) + let offset = 0 + + // header + // traceId + buf.writeUInt32LE(data.traceId, offset) + offset += 4 + + // parentId + buf.writeUInt32LE(0, offset) + offset += 4 + + // spanId + buf.writeUInt32LE(data.spanId, offset) + offset += 4 + + // seqId // 顺序 id,消息部分顺序序列号 + buf.writeUInt32LE(data.seqId, offset) + offset += 4 + + // message total length + buf.writeUInt32LE(data.totalLength, offset) + offset += 4 + + // payload length 当前 + buf.writeUInt32LE(data.payload.byteLength, offset) + offset += 4 + + // payload type + buf.writeUInt8(data.type, offset) + offset += 1 + + // opCode + buf.writeUInt8(data.opCode, offset) + offset += 1 + + // timestamp1 + buf.writeUInt32LE(this.now(), offset) + offset += 4 + + // timestamp2 + buf.writeUInt32LE(0, offset) + offset += 4 + + // timestamp3 + buf.writeUInt32LE(0, offset) + offset += 4 + + // timestamp4 + buf.writeUInt32LE(0, offset) + offset += 4 + + // timestamp5 + buf.writeUInt32LE(0, offset) + offset += 4 + + // timestamp6 + buf.writeUInt32LE(0, offset) + offset += 4 + + // timestamp7 + buf.writeUInt32LE(0, offset) + offset += 4 + + // request content data type + buf.writeUInt8(data.contentType, offset) + offset += 1 + + // response data type + buf.writeUInt8(data.dataType, offset) + offset += 1 + + buf.writeUInt16LE(0, offset) + offset += 2 + + // extra1 + buf.writeUInt32LE(0, offset) + offset += 4 + + // extra2 + buf.writeUInt32LE(0, offset) + offset += 4 + + // payload + buf.fill(data.payload, offset, data.payload.byteLength + offset) + + return buf + } + + readPayload(arrayBuf) { + const buf = Buffer.from(arrayBuf) + let offset = 0 + + const traceId = buf.readUInt32LE(offset) + offset += 4 + + const parentId = buf.readUInt32LE(offset) + offset += 4 + + const spanId = buf.readUInt32LE(offset) + offset += 4 + + const seqId = buf.readUInt32LE(offset) + offset += 4 + + const totalLength = buf.readUInt32LE(offset) + offset += 4 + + const payloadLength = buf.readUInt32LE(offset) + offset += 4 + + const payloadType = buf.readUInt8(offset) + offset += 1 + + const opCode = buf.readUInt8(offset) + offset += 1 + + const timestamp1 = buf.readUInt32LE(offset) + offset += 4 + + const timestamp2 = buf.readUInt32LE(offset) + offset += 4 + + const timestamp3 = buf.readUInt32LE(offset) + offset += 4 + + const timestamp4 = buf.readUInt32LE(offset) + offset += 4 + + const timestamp5 = buf.readUInt32LE(offset) + offset += 4 + + const timestamp6 = buf.readUInt32LE(offset) + offset += 4 + + const timestamp7 = buf.readUInt32LE(offset) + offset += 4 + + // request data type + const contentType = buf.readUInt8(offset) + offset += 1 + + // response data type + const dataType = buf.readUInt8(offset) + offset += 1 + + const extra1 = buf.readUInt16LE(offset) + offset += 2 + + const extra2 = buf.readUInt32LE(offset) + offset += 4 + + const extra3 = buf.readUInt32LE(offset) + offset += 4 + + const payload = buf.subarray(offset) + + return { + traceId, + parentId, + spanId, + seqId, + totalLength, + payloadLength, + payloadType, + opCode, + contentType, + dataType, + timestamp1, + timestamp2, + timestamp3, + timestamp4, + timestamp5, + timestamp6, + timestamp7, + extra1, + extra2, + extra3, + payload + } + } + + onFragmentData(bin) { + const data = this.readBin(bin) + this.emit('raw', bin) + + DEBUG && logger.debug('receive data=>', JSON.stringify(data)) + if (data.flag === MessageFlag.App && data.type === MessageType.Shake) { + this.appSidePort = data.port2 + logger.debug('appSidePort=>', data.port2) + this.shakeTask.resolve() + } else if (data.flag === MessageFlag.App && data.type === MessageType.Data && data.port2 === this.appSidePort) { + this.emit('message', data.payload) + this.emit('read', data) + } else if (data.flag === MessageFlag.App && data.type === MessageType.DataWithSystemTool && data.port2 === this.appSidePort) { + this.emit('message', data.payload) + this.emit('read', data) + } else if (data.flag === MessageFlag.App && data.type === MessageType.Log && data.port2 === this.appSidePort) { + this.emit('log', data.payload) + } else { + logger.error('error appSidePort=>%d data=>%j', this.appSidePort, data) + } + } + + errorIfBleDisconnect() { } + + onMessage(messagePayload) { + const payload = this.readPayload(messagePayload) + let session = this.sessionMgr.getById(payload.traceId, payload.payloadType) + + if (!session) { + session = this.sessionMgr.newSession(payload.traceId, payload.payloadType, this) + + // TODO: 需要考虑缓冲,监听回调要放到启动之前,或者没有增加监听就缓存请求 + session.on('data', (fullPayload) => { + if (fullPayload.opCode === MessagePayloadOpCode.Finished) { + if (fullPayload.payloadType === MessagePayloadType.Request) { + this.emit('request', { + request: fullPayload, + response: ({ data }) => { + this.response({ + requestId: fullPayload.traceId, + contentType: fullPayload.contentType, + dataType: fullPayload.dataType, + data + }) + } + }) + } else if (fullPayload.payloadType === MessagePayloadType.Response) { + this.emit('response', fullPayload) + } else if (fullPayload.payloadType === MessagePayloadType.Notify) { + this.emit('call', fullPayload) + } + + this.emit('data', fullPayload) + this.sessionMgr.destroy(session) + } + }) + + session.on('error', (error) => { + this.sessionMgr.destroy(session) + this.emit('error', error) + }) + } + + session.addChunk(payload) + } + + /** + * 发送请求 + * @param {object buffer arraybuffer arraybuffer like} data 传输的数据 + * @param {*} opts + * @returns + */ + request(data, opts) { + const _request = () => { + const defaultOpts = { + timeout: 60000, + contentType: 'json', + dataType: 'json' + } + const requestId = genTraceId() + const defer = Deferred() + opts = Object.assign(defaultOpts, opts) + + const error = (error) => { + this.off('error', error) + defer.reject(error) + } + + const transact = ({ traceId, payload, dataType }) => { + this.errorIfBleDisconnect() + DEBUG && logger.debug('traceId=>%d payload=>%s', traceId, payload.toString('hex')) + if (traceId === requestId) { + let result + switch (dataType) { + case MessagePayloadDataTypeOp.TEXT: + result = buf2str(payload) + break + case MessagePayloadDataTypeOp.BIN: + result = payload + break + case MessagePayloadDataTypeOp.JSON: + result = buf2json(payload) + break + default: // text + result = buf2str(payload) + break + } + DEBUG && logger.debug('request id=>%d payload=>%j', requestId, data) + DEBUG && logger.debug('response id=>%d payload=>%j', requestId, result) + + this.off('response', transact) + this.off('error', error) + defer.resolve(result) + } + } + + this.on('response', transact) + this.on('error', error) + if (Buffer.isBuffer(data)) { + this.sendBuf({ + requestId, + buf: data, + type: MessagePayloadType.Request, + contentType: MessagePayloadDataTypeOp.BIN, + dataType: getDataType(opts.dataType) + }) + } else if (data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { + this.sendBuf({ + requestId, + buf: Buffer.from(data), + type: MessagePayloadType.Request, + contentType: MessagePayloadDataTypeOp.BIN, + dataType: getDataType(opts.dataType) + }) + } else { + this.sendJson({ + requestId, + json: data, + type: MessagePayloadType.Request, + contentType: MessagePayloadDataTypeOp.JSON, + dataType: getDataType(opts.dataType) + }) + } + + let hasReturned = false + + return Promise.race([ + timeout(opts.timeout, (resolve, reject) => { + if (hasReturned) { + return resolve() + } + + DEBUG && logger.error(`request timeout in ${opts.timeout}ms error=> %d data=> %j`, requestId, data) + this.off('response', transact) + + reject(Error(`Timed out in ${opts.timeout}ms.`)) + }), + defer.promise.finally(() => { + hasReturned = true + }) + ]) + } + + return this.waitingShakePromise.then(_request) + } + + requestCb(data, opts, cb) { + const _requestCb = () => { + const defaultOpts = { + timeout: 60000, + contentType: 'json', + dataType: 'json' + } + + if (typeof opts === 'function') { + cb = opts + opts = defaultOpts + } else { + opts = Object.assign(defaultOpts, opts) + } + + const requestId = genTraceId() + let timer1 = null + let hasReturned = false + + const transact = ({ traceId, payload, dataType }) => { + DEBUG && logger.debug('traceId=>%d payload=>%s', traceId, payload.toString('hex')) + if (traceId === requestId) { + let result + switch (dataType) { + case MessagePayloadDataTypeOp.TEXT: + result = buf2str(payload) + break + case MessagePayloadDataTypeOp.BIN: + result = payload + break + case MessagePayloadDataTypeOp.JSON: + result = buf2json(payload) + break + default: // text + result = buf2str(payload) + break + } + DEBUG && logger.debug('request id=>%d payload=>%j', requestId, data) + DEBUG && logger.debug('response id=>%d payload=>%j', requestId, result) + + timer1 && clearTimeout(timer1) + timer1 = null + this.off('response', transact) + hasReturned = true + cb(null, result) + } + } + + this.on('response', transact) + if (Buffer.isBuffer(data)) { + this.sendBuf({ + requestId, + buf: data, + type: MessagePayloadType.Request, + contentType: MessagePayloadDataTypeOp.BIN, + dataType: getDataType(opts.dataType) + }) + } else if (data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { + this.sendBuf({ + requestId, + buf: Buffer.from(data), + type: MessagePayloadType.Request, + contentType: MessagePayloadDataTypeOp.BIN, + dataType: getDataType(opts.dataType) + }) + } else { + this.sendJson({ + requestId, + json: data, + type: MessagePayloadType.Request, + contentType: MessagePayloadDataTypeOp.JSON, + dataType: getDataType(opts.dataType) + }) + } + + timer1 = setTimeout(() => { + timer1 = null + if (hasReturned) { + return + } + + DEBUG && logger.error(`request time out in ${opts.timeout}ms error=>%d data=>%j`, requestId, data) + this.off('response', transact) + cb(Error(`Timed out in ${opts.timeout}ms.`)) + }, opts.timeout) + } + + return this.waitingShakePromise.then(_requestCb) + } + + /** + * 相应接口给当前请求 + * @param {obj} param0 + */ + response({ requestId, contentType, dataType, data }) { + if (MessagePayloadDataTypeOp.BIN === dataType) { + this.sendBuf({ + requestId, + buf: data, + type: MessagePayloadType.Response, + contentType, + dataType + }) + } else { + this.sendJson({ + requestId, + json: data, + type: MessagePayloadType.Response, + contentType, + dataType + }) + } + } + + /** + * call 模式调用接口到伴生服务 + * @param {json | buffer} data + * @returns + */ + call(data) { + return this.waitingShakePromise.then(() => { + if (Buffer.isBuffer(data)) { + return this.sendBuf({ + buf: data, + type: MessagePayloadType.Notify, + contentType: MessagePayloadDataTypeOp.BIN, + dataType: MessagePayloadDataTypeOp.EMPTY + }) + } else if (data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { + return this.sendBuf({ + buf: Buffer.from(data), + type: MessagePayloadType.Notify, + contentType: MessagePayloadDataTypeOp.BIN, + dataType: MessagePayloadDataTypeOp.EMPTY + }) + } else { + return this.sendJson({ + json: data, + type: MessagePayloadType.Notify, + contentType: MessagePayloadDataTypeOp.JSON, + dataType: MessagePayloadDataTypeOp.EMPTY + }) + } + }) + } +} diff --git a/utils/config/constants.js b/utils/config/constants.js new file mode 100644 index 0000000..60a5029 --- /dev/null +++ b/utils/config/constants.js @@ -0,0 +1,2 @@ +export const DEFAULT_COLOR = 0xfc6950; +export const DEFAULT_COLOR_TRANSPARENT = 0xfeb4a8; diff --git a/utils/config/device.js b/utils/config/device.js new file mode 100644 index 0000000..ea0d3df --- /dev/null +++ b/utils/config/device.js @@ -0,0 +1,2 @@ +import { getDeviceInfo } from "@zos/device"; +export const { width: DEVICE_WIDTH, height: DEVICE_HEIGHT } = getDeviceInfo();