文章目录
- 分布式数据对象管理框架是一款面向对象的内存数据管理框架,向应用开发者提供内存对象的创建、查询、删除、修改、订阅等基本数据对象的管理能力,同时具备分布式能力,满足超级终端场景下,相同应用多设备间的数据对象协同需求。
分布式数据对象提供JS接口,让开发者能以使用本地对象的方式使用分布式对象。分布式数据对象支持的数据类型包括数字型、字符型、布尔型等基本类型,同时也支持数组、基本类型嵌套等复杂类型。
约束与限制:不同设备间只有相同bundleName的应用才能直接同步;不建议创建过多分布式对象,每个分布式对象将占用100-150KB内存;每个对象大小不超过500KB;如对复杂类型的数据进行修改,仅支持修改根属性,暂不支持下级属性修改。
分布式数据对象依赖于分布式数据服务。
项目路径:https://gitee.com/openharmony/distributeddatamgr_objectstore。
- import distributedObject from '@ohos.data.distributedDataObject'
- 接口名称 描述 function createDistributedObject(source: object): DistributedObject; 建分布式对象source中指定分布式对象中的属性返回值是创建出的分布式对象,接口见DistrubutedObject。 接口名称 描述 function createDistributedObject(source: object): DistributedObject; 创建分布式对象source中指定分布式对象中的属性返回值是创建出的分布式对象,接口见DistrubutedObject。 function genSessionId(): string; 随机创建sessionId,返回值是随机创建的sessionId。
- 接口名称 描述 setSessionId(sessionId: string): boolean; 设置同步的sessionId,可信组网中有多个设备时,多个设备间的对象如果设置为同一个sessionId,就能自动同步sessionId是指定的sessionId,如果要退出分布式组网,设置为“”或不设置均可,返回值是操作结果,true标识设置session成功。 on(type: ‘change’, callback: Callback<{ sessionId: string, fields: Array }>): void; 监听对象的变更type,固定为’change’,callback是变更时触发的回调,回调参数sessionId标识变更对象的sessionId,fields标识对象变更的属性名。 off(type: ‘change’, callback: Callback<{ sessionId: string, fields: Array } 删除对象的变更监听,type固定为’change’,callback为可选参数,不设置表示删除该对象所有变更监听。 on(type: ‘status’, callback: Callback<{ sessionId: string, networkId: string, status: ‘online’ | ‘offline’ }>): void 监听对象的变更,type固定为’status’,callback是变更时触发的回调,回调参数sessionId标识变更对象的sessionId,networkId标识对象设备的networkId,status标识对象为’online’(上线)或’offline’(下线)的状态。 off(type: ‘status’, callback: Callback<{ sessionId: string, deviceId: string, status: ‘online’ | ‘offline’ }>): void 删除对象的变更监听,type固定为’change’,callback为可选参数,不设置表示删除该对象所有上下线监听。
- import distributedObject from '@ohos.data.distributedDataObject'
-
- JS接口C++部分frameworks\jskitsimpl\src\adaptor\js_module_init.cpp ```c++static napi_value DistributedDataObjectExport(napi_env env, napi_value exports){ napi_status status; static napi_property_descriptor desc[] = { DECLARE_NAPI_FUNCTION("createObjectSync", JSDistributedObjectStore::JSCreateObjectSync), DECLARE_NAPI_FUNCTION("destroyObjectSync", JSDistributedObjectStore::JSDestroyObjectSync), DECLARE_NAPI_FUNCTION("on", JSDistributedObjectStore::JSOn), DECLARE_NAPI_FUNCTION("off", JSDistributedObjectStore::JSOff), DECLARE_NAPI_FUNCTION("recordCallback", JSDistributedObjectStore::JSRecordCallback), DECLARE_NAPI_FUNCTION("deleteCallback", JSDistributedObjectStore::JSDeleteCallback), }; status = napi_define_properties(env, exports, sizeof(desc) / sizeof(desc[0]), desc); CHECK_EQUAL_WITH_RETURN_NULL(status, napi_ok); return exports;}// storage module definestatic napi_module storageModule = { .nm_version = 1, .nm_flags = 0, .nm_filename = nullptr, .nm_register_func = DistributedDataObjectExport, .nm_modname = "data.distributedDataObject", .nm_priv = ((void *)0), .reserved = { 0 },};// distributeddataobject module registerstatic __attribute__((constructor)) void RegisterModule(){ napi_module_register(&storageModule);}``` JS接口JS部分:interfaces\jskits\distributed_data_object.js。 ```javascriptclass Distributed { constructor(obj) { this.__proxy = obj; Object.keys(obj).forEach(key => { Object.defineProperty(this, key, { enumerable: true, configurable: true, get: function () { return this.__proxy[key]; }, set: function (newValue) { this.__proxy[key] = newValue; } }); }); Object.defineProperty(this, SESSION_ID, { enumerable: true, configurable: true, get: function () { return this.__proxy[SESSION_ID]; }, set: function (newValue) { this.__proxy[SESSION_ID] = newValue; } }); this.__objectId = randomNum(); console.info("constructor success "); }; setSessionId(sessionId) { if (sessionId == null || sessionId == "") { leaveSession(this.__proxy); return false; } if (this.__proxy[SESSION_ID] == sessionId) { console.info("same session has joined " + sessionId); return true; } leaveSession(this.__proxy); let object = joinSession(this.__proxy, this.__objectId, sessionId); if (object != null) { this.__proxy = object; return true; } return false; }; on(type, callback) { onWatch(type, this.__proxy, callback); distributedObject.recordCallback(type, this.__objectId, callback); }; off(type, callback) { offWatch(type, this.__proxy, callback); distributedObject.deleteCallback(type, this.__objectId, callback); }; __proxy; __objectId;}export default { createDistributedObject: newDistributed, genSessionId: randomNum}```
- ```javascript// 创建对象,对象包含3个基本属性:name,age和isVis;2个复杂属性:parent,listvar g_object = distributedObject.createDistributedObject({name:undefined, age:undefined, isVis:true, parent:undefined, list:undefined});``` 创建一个 Distributed 对象,此对象为node.js类 Distributed的对象。 此类有两个成员:__proxy 存储传入的参数对象,当前对象添加传入的参数各个key为属性名的属性,每个属性定义get和set方法;并加上 __sessionId 为属性名的属性,定义get和set属性。 __objectId 存储对象ID,此值为一个随机值。返回创建的对象。 ```javascriptconstructor(obj) { this.__proxy = obj; Object.keys(obj).forEach(key => { Object.defineProperty(this, key, { enumerable: true, configurable: true, get: function () { return this.__proxy[key]; }, set: function (newValue) { this.__proxy[key] = newValue; } }); }); Object.defineProperty(this, SESSION_ID, { enumerable: true, configurable: true, get: function () { return this.__proxy[SESSION_ID]; }, set: function (newValue) { this.__proxy[SESSION_ID] = newValue; } }); this.__objectId = randomNum(); console.info("constructor success ");};```
- 获取长度为8的由随机数字构成的字符串。
- 发起方 javascript g_object.setSessionId(distributedObject.genSessionId())。 被拉起方 javascript //sessionId与发起方的__sessionId一致 g_object.setSessionId(sessionId)。 设置同步的sessionId,多个可信组网中有多个设备时,多个设备间的对象如果设置为同一个sessionId,就能自动同步。如果要退出分布式组网,设置为“”或不设置均可。 如果sessionId和缓存存储的sessionId是一样的,直接返回true,如果不相同,先调用 distributedObject.destroyObjectSync(obj) 删除旧的sessionId,再调用 distributedObject.createObjectSync(sessionId, objectId) 利用sessionId创建新的数据对象。然后缓存存储新的sessionId。
-
- ```javascriptchangeCallback : function (sessionId, changeData) { console.info("change" + sessionId + " " + this.response); if (changeData != null && changeData != undefined) { changeData.forEach(element => { console.info("changed !" + element + " " + g_object[element]); }); }} g_object.on("change", this.changeCallback);``` 开启对对象数据变更的监听,type固定为’change’,callback是变更时触发的回调,回调参数sessionId标识变更对象的sessionId,fields标识对象变更的属性名。
- onWatch(type, this.__proxy, callback)。 distributedObject.on(type, obj, callback)调用JS接口C++ 部分的函数,参数type为’change’,obj为数据对象,callback为回调函数。 distributedObject.recordCallback(type, this.__objectId, callback)调用JS接口C++ 部分的函数,参数type为’change’,__objectId为数据对象的ID号,callback为回调函数。添加回调函数到全局回调map中。
- JSDistributedObjectStore::JSOn。
- 使用napi解析传过来的参数type,objectId,callbackType调用 AddCallback(env, g_changeCallBacks, objectId, argv[2]) 添加回调到对应的全局map回调中。 JSDistributedObjectStore::AddCallback 函数: c++ void JSDistributedObjectStore::AddCallback(napi_env env, std::map<std::string, std::list<napi_ref>> &callbacks, const std::string &objectId, napi_value callback) { LOG_INFO("add callback %{public}s", objectId.c_str()); napi_ref ref = nullptr; napi_status status = napi_create_reference(env, callback, 1, &ref); CHECK_EQUAL_WITH_RETURN_VOID(status, napi_ok); if (callbacks.count(objectId) != 0) { auto lists = callbacks.at(objectId); lists.push_back(ref); callbacks.insert_or_assign(objectId, lists); } else { std::list<napi_ref> lists = { ref }; callbacks.insert_or_assign(objectId, lists); } } napi_status status = napi_create_reference(env, callback, 1, &ref) 创建回调的引用,添加到key为mapobjectId对应的list中。即一个objectId可以对应多个回调引用,存储再一个list中。
- ```javascriptstatusCallback : function (sessionId, networkid, status) { this.response += "status changed " + sessionId + " " + status + " " + networkId;}g_object.on("status", this.changeCallback);``` 开启对对象上下线变更的监听,type固定为’status’,callback是变更时触发的回调,回调参数sessionId标识变更对象的sessionId,fields标识对象变更的属性名。
- onWatch(type, this.__proxy, callback)distributedObject.on(type, obj, callback)调用JS接口C++部分的函数,参数type为’status’,obj为数据对象,callback为回调函数。distributedObject.recordCallback(type, this.__objectId, callback)调用JS接口C++ 部分的函数,参数type为’status’,__objectId为数据对象的ID号,callback为回调函数。添加回调函数到全局回调map中。
- JSDistributedObjectStore::JSOn。
- 使用napi解析传过来的参数type,objectId,callbackType调用 AddCallback(env, g_statusCallBacks, objectId, argv[2]) 添加回调到对应的全局map回调中。
-
- ```javascript//删除变更回调changeCallbackg_object.off("change", changeCallback);//删除所有的变更回调g_object.off("change"); ``` 删除对数据对象的数据变更监听,type固定为’change’,callback为可选参数,不设置表示删除该对象所有变更监听。
- offWatch(type, this.__proxy, callback). distributedObject.off(type, obj, callback) 调用JS接口C++ 部分函数,参数type为’change’,obj为数据对象,callback为回调函数。 distributedObject.deleteCallback(type, this.__objectId, callback) 调用JS接口C++ 部分函数,参数type为’change’,__objectId为数据对象的ID号,callback为回调函数。从全局回调map中删除回调函数,要是callback为空,则删除此ID号的全部回调函数。
- JSDistributedObjectStore::JSOff。 使用napi解析获取传过来的参数type,使用napi_unwrap根据传入的参数获取之前绑定的wrapper对象。如果第三个参数为空,DeleteWatch参数handler为nullptr。 调用 JSWatcher::Off 进行删除。 参数handler为nullptr先调用 ChangeEventListener::Clear(napi_env env) 删除所有回调。 ```c++void ChangeEventListener::Clear(napi_env env){ EventListener::Clear(env); if (isWatched_ && object_ != nullptr) { uint32_t ret = objectStore_->UnWatch(object_); if (ret != SUCCESS) { LOG_ERROR("UnWatch %{public}s error", object_->GetSessionId().c_str()); } else { LOG_INFO("UnWatch %{public}s success", object_->GetSessionId().c_str()); isWatched_ = false; } }}``` 参数handler不为nullptr先调用 ChangeEventListener::Del(napi_env env, napi_value handler) 删除对应的handler。 ```c++bool ChangeEventListener::Del(napi_env env, napi_value handler){ bool isEmpty = EventListener::Del(env, handler); if (isEmpty && isWatched_ && object_ != nullptr) { uint32_t ret = objectStore_->UnWatch(object_); if (ret != SUCCESS) { LOG_ERROR("UnWatch %{public}s error", object_->GetSessionId().c_str()); } else { LOG_INFO("UnWatch %{public}s success", object_->GetSessionId().c_str()); isWatched_ = false; } } return isEmpty;}``` 再调用 *DistributedObjectStoreImpl::UnWatch(DistributedObject object) 删除分布式数据服务内的监听。 ```c++uint32_t DistributedObjectStoreImpl::UnWatch(DistributedObject *object){ if (object == nullptr) { LOG_ERROR("DistributedObjectStoreImpl::Sync object err "); return ERR_NULL_OBJECT; } if (flatObjectStore_ == nullptr) { LOG_ERROR("DistributedObjectStoreImpl::Sync object err "); return ERR_NULL_OBJECTSTORE; } uint32_t status = flatObjectStore_->UnWatch(object->GetSessionId()); if (status != SUCCESS) { LOG_ERROR("DistributedObjectStoreImpl::Watch failed %{public}d", status); return status; } watchers_.erase(object); LOG_INFO("DistributedObjectStoreImpl:UnWatch object success."); return SUCCESS;}``````c++uint32_t FlatObjectStore::UnWatch(const std::string &sessionId){ if (!storageEngine_->isOpened_) { LOG_ERROR("FlatObjectStore::DB has not inited"); return ERR_DB_NOT_INIT; } uint32_t status = storageEngine_->UnRegisterObserver(sessionId); if (status != SUCCESS) { LOG_ERROR("FlatObjectStore::Watch failed %{public}d", status); } return status;}``````c++uint32_t FlatObjectStorageEngine::UnRegisterObserver(const std::string &key){ if (!isOpened_) { LOG_ERROR("FlatObjectStorageEngine::RegisterObserver kvStore has not init"); return ERR_DB_NOT_INIT; } std::unique_lock<std::shared_mutex> lock(operationMutex_); if (delegates_.count(key) == 0) { LOG_INFO("FlatObjectStorageEngine::RegisterObserver %{public}s not exist", key.c_str()); return ERR_DB_NOT_EXIST; } auto iter = observerMap_.find(key); if (iter == observerMap_.end()) { LOG_ERROR("FlatObjectStorageEngine::UnRegisterObserver observer not exist."); return ERR_NO_OBSERVER; } auto delegate = delegates_.at(key); std::shared_ptr<TableWatcher> watcher = iter->second; LOG_INFO("start UnRegisterObserver %{public}s", key.c_str()); DistributedDB::DBStatus status = delegate->UnRegisterObserver(watcher.get()); if (status != DistributedDB::DBStatus::OK) { LOG_ERROR("FlatObjectStorageEngine::UnRegisterObserver unRegister err %{public}d", status); return ERR_UNRIGSTER; } LOG_INFO("end UnRegisterObserver %{public}s", key.c_str()); observerMap_.erase(key); return SUCCESS;}``` JSDistributedObjectStore::JSDeleteCallback。 使用napi解析获取传过来的参数type,objectId如果参数为两个调用 DelCallback(env, g_changeCallBacks, objectId) 进行注册的回调函数的删除,如果有第三个参数调用 DelCallback(env, g_changeCallBacks, objectId, argv[2]) 进行注册的回调函数的删除。 对于函数DelCallback如果第三个参数为nullptr则清空此ID的所有注册的回调函数。 否则删除对应注册的回调函数。如果是最后一个则把这个ID对象也删除了。 ```c++void JSDistributedObjectStore::DelCallback(napi_env env, std::map<std::string, std::list<napi_ref>> &callbacks, const std::string &sessionId, napi_value callback){ LOG_INFO("del callback %{public}s", sessionId.c_str()); napi_status status; if (callback == nullptr) { if (callbacks.count(sessionId) != 0) { for (auto ref : callbacks.at(sessionId)) { status = napi_delete_reference(env, ref); CHECK_EQUAL_WITH_RETURN_VOID(status, napi_ok); } callbacks.erase(sessionId); } return; } napi_value callbackTmp; if (callbacks.count(sessionId) != 0) { auto lists = callbacks.at(sessionId); for (auto iter = lists.begin(); iter != lists.end();) { status = napi_get_reference_value(env, *iter, &callbackTmp); CHECK_EQUAL_WITH_RETURN_VOID(status, napi_ok); bool isEquals = false; napi_strict_equals(env, callbackTmp, callback, &isEquals); if (isEquals) { napi_delete_reference(env, *iter); iter = lists.erase(iter); } else { iter++; } } if (lists.empty()) { callbacks.erase(sessionId); } else { callbacks.insert_or_assign(sessionId, lists); } }}```
- ```javascriptstatusCallback : function (sessionId, networkid, status) { this.response += "status changed " + sessionId + " " + status + " " + networkId;}g_object.on("status", this.changeCallback);``` 删除对象的上下线变更监听,type固定为’status’,callback为可选参数,不设置表示删除该对象所有上下线监听。
- offWatch(type, this.__proxy, callback)。 distributedObject.off(type, obj, callback)调用JS接口C++ 部分函数,参数type为’status’,obj为数据对象,callback为回调函数。 distributedObject.deleteCallback(type, this.__objectId, callback)调用JS接口C++ 部分函数,参数type为’status’,__objectId为数据对象的ID号,callback为回调函数。从全局回调map中删除回调函数,要是callback为空,则删除此ID号的全部回调函数。
- JSDistributedObjectStore::JSOff。 使用napi解析获取传过来的参数type,使用napi_unwrap根据传入的参数获取之前绑定的wrapper对象。如果第三个参数为空,DeleteWatch参数handler为nullptr。 调用 JSWatcher::Off 进行删除。 参数handler为nullptr先调用 StatusEventListener::Clear(napi_env env) 删除所有回调。 参数handler不为nullptr调用 StatusEventListener::Del(napi_env env, napi_value handler) 删除对应的handler。 然后调用 NotifierImpl::GetInstance()->DelWatcher(sessionId_) 删除监控。 JSDistributedObjectStore::JSDeleteCallback。 使用napi解析获取传过来的参数type,objectId如果参数为两个调用 DelCallback(env, g_statusCallBacks, objectId) 进行注册的回调函数的删除,如果有第三个参数调用 DelCallback(env, g_statusCallBacks, objectId, argv[2]) 进行注册的回调函数的删除。 对于函数DelCallback如果第三个参数为nullptr则清空此ID的所有注册的回调函数。 否则删除对应注册的回调函数。如果是最后一个则把这个ID对象也删除了。
- 在 DistributedObject::setSessionId 接口中给对象属性都绑定了C++ 接口部分对应的接口函数。
在给属性赋值或修改值是会调用 JSDistributedObject::JSPut ,在获取属性的值时会调用 JSDistributedObject::JSGet 。
```c++const char *distributedObjectName = "DistributedObject";napi_property_descriptor distributedObjectDesc[] = { DECLARE_NAPI_FUNCTION("put", JSDistributedObject::JSPut), DECLARE_NAPI_FUNCTION("get", JSDistributedObject::JSGet),};napi_status status = napi_define_class(env, distributedObjectName, strlen(distributedObjectName), JSDistributedObject::JSConstructor, nullptr, sizeof(distributedObjectDesc) / sizeof(distributedObjectDesc[0]), distributedObjectDesc, &distributedObjectClass);CHECK_EQUAL_WITH_RETURN_NULL(status, napi_ok);if (g_instance == nullptr) { g_instance = new napi_ref; status = napi_create_reference(env, distributedObjectClass, 1, g_instance); CHECK_EQUAL_WITH_RETURN_NULL(status, napi_ok);}```
- ```javascriptg_object.name = "jack";g_object.age = 19;g_object.isVis = false; g_object.parent = {mother:"jack mom",father:"jack Dad"};g_object.list = [{mother:"jack mom"}, {father:"jack Dad"}];// 对端设备收到change回调,fields为name,age,isVis,parent和list```
- ```javascriptconsole.info("name " + g_object["name"]); //访问到的是组网内最新数据```
- 本文使用OpenHarmony 3.1版本的分布式数据对象模块源码,对模块提供的JS接口进行了源码级的介绍,对主要JS接口功能的实现流程也进行深入的介绍。分布式数据对象服务依赖于分布式数据管理,分布式软总线,系统能力管理等模块,感兴趣的读者可以研究下其他的模块的原理,可以更加方便的理解分布式数据对象。 本文一些JS例子代码来源于分布式对象用户手册。 想了解更多关于开源的内容,请访问: 51CTO 开源基础软件社区 https://ost.51cto.com。

分布式数据对象管理框架是一款面向对象的内存数据管理框架,向应用开发者提供内存对象的创建、查询、删除、修改、订阅等基本数据对象的管理能力,同时具备分布式能力,满足超级终端场景下,相同应用多设备间的数据对象协同需求。
分布式数据对象提供JS接口,让开发者能以使用本地对象的方式使用分布式对象。分布式数据对象支持的数据类型包括数字型、字符型、布尔型等基本类型,同时也支持数组、基本类型嵌套等复杂类型。
约束与限制:不同设备间只有相同bundleName的应用才能直接同步;不建议创建过多分布式对象,每个分布式对象将占用100-150KB内存;每个对象大小不超过500KB;如对复杂类型的数据进行修改,仅支持修改根属性,暂不支持下级属性修改。
分布式数据对象依赖于分布式数据服务。
项目路径:https://gitee.com/openharmony/distributeddatamgr_objectstore。
import distributedObject from '@ohos.data.distributedDataObject'
|
接口名称 |
描述 |
|
function createDistributedObject(source: object): DistributedObject; |
建分布式对象source中指定分布式对象中的属性返回值是创建出的分布式对象,接口见DistrubutedObject。 |
|
接口名称 |
描述 |
|
function createDistributedObject(source: object): DistributedObject; |
创建分布式对象source中指定分布式对象中的属性返回值是创建出的分布式对象,接口见DistrubutedObject。 |
|
function genSessionId(): string; |
随机创建sessionId,返回值是随机创建的sessionId。 |
|
接口名称 |
描述 |
|
setSessionId(sessionId: string): boolean; |
设置同步的sessionId,可信组网中有多个设备时,多个设备间的对象如果设置为同一个sessionId,就能自动同步sessionId是指定的sessionId,如果要退出分布式组网,设置为“”或不设置均可,返回值是操作结果,true标识设置session成功。 |
|
on(type: ‘change’, callback: Callback<{ sessionId: string, fields: Array }>): void; |
监听对象的变更type,固定为’change’,callback是变更时触发的回调,回调参数sessionId标识变更对象的sessionId,fields标识对象变更的属性名。 |
|
off(type: ‘change’, callback: Callback<{ sessionId: string, fields: Array } |
删除对象的变更监听,type固定为’change’,callback为可选参数,不设置表示删除该对象所有变更监听。 |
|
on(type: ‘status’, callback: Callback<{ sessionId: string, networkId: string, status: ‘online’ | ‘offline’ }>): void |
监听对象的变更,type固定为’status’,callback是变更时触发的回调,回调参数sessionId标识变更对象的sessionId,networkId标识对象设备的networkId,status标识对象为’online’(上线)或’offline’(下线)的状态。 |
|
off(type: ‘status’, callback: Callback<{ sessionId: string, deviceId: string, status: ‘online’ | ‘offline’ }>): void |
删除对象的变更监听,type固定为’change’,callback为可选参数,不设置表示删除该对象所有上下线监听。 |

JS接口C++部分frameworks\jskitsimpl\src\adaptor\js_module_init.cpp
```c++
static napi_value DistributedDataObjectExport(napi_env env, napi_value exports)
{
napi_status status;
static napi_property_descriptor desc[] = {
DECLARE_NAPI_FUNCTION("createObjectSync", JSDistributedObjectStore::JSCreateObjectSync),
DECLARE_NAPI_FUNCTION("destroyObjectSync", JSDistributedObjectStore::JSDestroyObjectSync),
DECLARE_NAPI_FUNCTION("on", JSDistributedObjectStore::JSOn),
DECLARE_NAPI_FUNCTION("off", JSDistributedObjectStore::JSOff),
DECLARE_NAPI_FUNCTION("recordCallback", JSDistributedObjectStore::JSRecordCallback),
DECLARE_NAPI_FUNCTION("deleteCallback", JSDistributedObjectStore::JSDeleteCallback),
};
status = napi_define_properties(env, exports, sizeof(desc) / sizeof(desc[0]), desc);
CHECK_EQUAL_WITH_RETURN_NULL(status, napi_ok);
return exports;
}
// storage module define
static napi_module storageModule = {
.nm_version = 1,
.nm_flags = 0,
.nm_filename = nullptr,
.nm_register_func = DistributedDataObjectExport,
.nm_modname = "data.distributedDataObject",
.nm_priv = ((void *)0),
.reserved = { 0 },
};
// distributeddataobject module register
static __attribute__((constructor)) void RegisterModule()
{
napi_module_register(&storageModule);
}
```
JS接口JS部分:interfaces\jskits\distributed_data_object.js。
```javascript
class Distributed {
constructor(obj) {
this.__proxy = obj;
Object.keys(obj).forEach(key => {
Object.defineProperty(this, key, {
enumerable: true,
configurable: true,
get: function () { return this.__proxy[key]; },
set: function (newValue) {
this.__proxy[key] = newValue; } }); });
Object.defineProperty(this, SESSION_ID, {
enumerable: true,
configurable: true,
get: function () { return this.__proxy[SESSION_ID]; },
set: function (newValue) {
this.__proxy[SESSION_ID] = newValue; } });
this.__objectId = randomNum();
console.info("constructor success ");
};
setSessionId(sessionId) {
if (sessionId == null || sessionId == "") {
leaveSession(this.__proxy); return false; }
if (this.__proxy[SESSION_ID] == sessionId) {
console.info("same session has joined " + sessionId);
return true;
}
leaveSession(this.__proxy);
let object = joinSession(this.__proxy, this.__objectId, sessionId);
if (object != null) { this.__proxy = object; return true; }
return false;
};
on(type, callback) { onWatch(type, this.__proxy, callback);
distributedObject.recordCallback(type, this.__objectId, callback);
};
off(type, callback) { offWatch(type, this.__proxy, callback);
distributedObject.deleteCallback(type, this.__objectId, callback);
};
__proxy;
__objectId;
}
export default {
createDistributedObject: newDistributed,
genSessionId: randomNum
}
```
```javascript
// 创建对象,对象包含3个基本属性:name,age和isVis;2个复杂属性:parent,list
var g_object = distributedObject.createDistributedObject({name:undefined, age:undefined, isVis:true,
parent:undefined, list:undefined});
```
// 创建对象,对象包含3个基本属性:name,age和isVis;2个复杂属性:parent,list
var g_object = distributedObject.createDistributedObject({name:undefined, age:undefined, isVis:true,
parent:undefined, list:undefined});
```
创建一个 Distributed 对象,此对象为node.js类 Distributed的对象。
此类有两个成员:__proxy 存储传入的参数对象,当前对象添加传入的参数各个key为属性名的属性,每个属性定义get和set方法;并加上 __sessionId 为属性名的属性,定义get和set属性。 __objectId 存储对象ID,此值为一个随机值。返回创建的对象。
```javascript
constructor(obj) {
this.__proxy = obj;
Object.keys(obj).forEach(key => {
Object.defineProperty(this, key, {
enumerable: true,
configurable: true,
get: function () {
return this.__proxy[key];
},
set: function (newValue) {
this.__proxy[key] = newValue;
}
});
});
Object.defineProperty(this, SESSION_ID, {
enumerable: true,
configurable: true,
get: function () {
return this.__proxy[SESSION_ID];
},
set: function (newValue) {
this.__proxy[SESSION_ID] = newValue;
}
});
this.__objectId = randomNum();
console.info("constructor success ");
};
```
获取长度为8的由随机数字构成的字符串。
发起方
javascript g_object.setSessionId(distributedObject.genSessionId())。
被拉起方
javascript //sessionId与发起方的__sessionId一致 g_object.setSessionId(sessionId)。
设置同步的sessionId,多个可信组网中有多个设备时,多个设备间的对象如果设置为同一个sessionId,就能自动同步。如果要退出分布式组网,设置为“”或不设置均可。
如果sessionId和缓存存储的sessionId是一样的,直接返回true,如果不相同,先调用 distributedObject.destroyObjectSync(obj) 删除旧的sessionId,再调用 distributedObject.createObjectSync(sessionId, objectId) 利用sessionId创建新的数据对象。然后缓存存储新的sessionId。
distributedObject.createObjectSync(js_module_init.cpp)流程:

通过上面函数调用等操作, Distributed 对象的成员 this.__proxy 的各个属性的get和put方法和接口导出的get和put关联起来了 JSObjectWrapper 内部存的 object_ 对象实际为 DistributedObjectImpl 对象。
JSDistributedObjectStore::JSDestroyObjectSync 流程。

```javascript
changeCallback : function (sessionId, changeData) {
console.info("change" + sessionId + " " + this.response);
if (changeData != null && changeData != undefined) {
changeData.forEach(element => {
console.info("changed !" + element + " " + g_object[element]);
});
}
}
g_object.on("change", this.changeCallback);
```
changeCallback : function (sessionId, changeData) {
console.info("change" + sessionId + " " + this.response);
if (changeData != null && changeData != undefined) {
changeData.forEach(element => {
console.info("changed !" + element + " " + g_object[element]);
});
}
}
g_object.on("change", this.changeCallback);
```
开启对对象数据变更的监听,type固定为’change’,callback是变更时触发的回调,回调参数sessionId标识变更对象的sessionId,fields标识对象变更的属性名。
- onWatch(type, this.__proxy, callback)。
- distributedObject.on(type, obj, callback)调用JS接口C++ 部分的函数,参数type为’change’,obj为数据对象,callback为回调函数。
- distributedObject.recordCallback(type, this.__objectId, callback)调用JS接口C++ 部分的函数,参数type为’change’,__objectId为数据对象的ID号,callback为回调函数。添加回调函数到全局回调map中。
JSDistributedObjectStore::JSOn。

使用napi解析传过来的参数type,objectId,callbackType调用 AddCallback(env, g_changeCallBacks, objectId, argv[2]) 添加回调到对应的全局map回调中。
JSDistributedObjectStore::AddCallback 函数:
c++ void JSDistributedObjectStore::AddCallback(napi_env env, std::map<std::string, std::list<napi_ref>> &callbacks, const std::string &objectId, napi_value callback) { LOG_INFO("add callback %{public}s", objectId.c_str()); napi_ref ref = nullptr; napi_status status = napi_create_reference(env, callback, 1, &ref); CHECK_EQUAL_WITH_RETURN_VOID(status, napi_ok); if (callbacks.count(objectId) != 0) { auto lists = callbacks.at(objectId); lists.push_back(ref); callbacks.insert_or_assign(objectId, lists); } else { std::list<napi_ref> lists = { ref }; callbacks.insert_or_assign(objectId, lists); } }
napi_status status = napi_create_reference(env, callback, 1, &ref) 创建回调的引用,添加到key为mapobjectId对应的list中。即一个objectId可以对应多个回调引用,存储再一个list中。
```javascript
statusCallback : function (sessionId, networkid, status) {
this.response += "status changed " + sessionId + " " + status + " " + networkId;
}
g_object.on("status", this.changeCallback);
```
statusCallback : function (sessionId, networkid, status) {
this.response += "status changed " + sessionId + " " + status + " " + networkId;
}
g_object.on("status", this.changeCallback);
```
开启对对象上下线变更的监听,type固定为’status’,callback是变更时触发的回调,回调参数sessionId标识变更对象的sessionId,fields标识对象变更的属性名。
- onWatch(type, this.__proxy, callback)distributedObject.on(type, obj, callback)调用JS接口C++
部分的函数,参数type为’status’,obj为数据对象,callback为回调函数。
distributedObject.recordCallback(type, this.__objectId, callback)调用JS接口C++ 部分的函数,参数type为’status’,__objectId为数据对象的ID号,callback为回调函数。添加回调函数到全局回调map中。
部分的函数,参数type为’status’,obj为数据对象,callback为回调函数。
distributedObject.recordCallback(type, this.__objectId, callback)调用JS接口C++ 部分的函数,参数type为’status’,__objectId为数据对象的ID号,callback为回调函数。添加回调函数到全局回调map中。
JSDistributedObjectStore::JSOn。

使用napi解析传过来的参数type,objectId,callbackType调用 AddCallback(env, g_statusCallBacks, objectId, argv[2]) 添加回调到对应的全局map回调中。
```javascript
//删除变更回调changeCallback
g_object.off("change", changeCallback);
//删除所有的变更回调
g_object.off("change");
```
//删除变更回调changeCallback
g_object.off("change", changeCallback);
//删除所有的变更回调
g_object.off("change");
```
删除对数据对象的数据变更监听,type固定为’change’,callback为可选参数,不设置表示删除该对象所有变更监听。
offWatch(type, this.__proxy, callback).
distributedObject.off(type, obj, callback) 调用JS接口C++ 部分函数,参数type为’change’,obj为数据对象,callback为回调函数。
distributedObject.deleteCallback(type, this.__objectId, callback) 调用JS接口C++ 部分函数,参数type为’change’,__objectId为数据对象的ID号,callback为回调函数。从全局回调map中删除回调函数,要是callback为空,则删除此ID号的全部回调函数。
JSDistributedObjectStore::JSOff。
使用napi解析获取传过来的参数type,使用napi_unwrap根据传入的参数获取之前绑定的wrapper对象。如果第三个参数为空,DeleteWatch参数handler为nullptr。
调用 JSWatcher::Off 进行删除。
参数handler为nullptr先调用 ChangeEventListener::Clear(napi_env env) 删除所有回调。
```c++
void ChangeEventListener::Clear(napi_env env)
{
EventListener::Clear(env);
if (isWatched_ && object_ != nullptr) {
uint32_t ret = objectStore_->UnWatch(object_);
if (ret != SUCCESS) {
LOG_ERROR("UnWatch %{public}s error", object_->GetSessionId().c_str());
} else {
LOG_INFO("UnWatch %{public}s success", object_->GetSessionId().c_str());
isWatched_ = false;
}
}
}
```
参数handler不为nullptr先调用 ChangeEventListener::Del(napi_env env, napi_value handler) 删除对应的handler。
```c++
bool ChangeEventListener::Del(napi_env env, napi_value handler)
{
bool isEmpty = EventListener::Del(env, handler);
if (isEmpty && isWatched_ && object_ != nullptr) {
uint32_t ret = objectStore_->UnWatch(object_);
if (ret != SUCCESS) {
LOG_ERROR("UnWatch %{public}s error", object_->GetSessionId().c_str());
} else {
LOG_INFO("UnWatch %{public}s success", object_->GetSessionId().c_str());
isWatched_ = false;
}
}
return isEmpty;
}
```
再调用 *DistributedObjectStoreImpl::UnWatch(DistributedObject object) 删除分布式数据服务内的监听。
```c++
uint32_t DistributedObjectStoreImpl::UnWatch(DistributedObject *object)
{
if (object == nullptr) {
LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
return ERR_NULL_OBJECT;
}
if (flatObjectStore_ == nullptr) {
LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
return ERR_NULL_OBJECTSTORE;
}
uint32_t status = flatObjectStore_->UnWatch(object->GetSessionId());
if (status != SUCCESS) {
LOG_ERROR("DistributedObjectStoreImpl::Watch failed %{public}d", status);
return status;
}
watchers_.erase(object);
LOG_INFO("DistributedObjectStoreImpl:UnWatch object success.");
return SUCCESS;
}
```
```c++
uint32_t FlatObjectStore::UnWatch(const std::string &sessionId)
{
if (!storageEngine_->isOpened_) {
LOG_ERROR("FlatObjectStore::DB has not inited");
return ERR_DB_NOT_INIT;
}
uint32_t status = storageEngine_->UnRegisterObserver(sessionId);
if (status != SUCCESS) {
LOG_ERROR("FlatObjectStore::Watch failed %{public}d", status);
}
return status;
}
```
```c++
uint32_t FlatObjectStorageEngine::UnRegisterObserver(const std::string &key)
{
if (!isOpened_) {
LOG_ERROR("FlatObjectStorageEngine::RegisterObserver kvStore has not init");
return ERR_DB_NOT_INIT;
}
std::unique_lock<std::shared_mutex> lock(operationMutex_);
if (delegates_.count(key) == 0) {
LOG_INFO("FlatObjectStorageEngine::RegisterObserver %{public}s not exist", key.c_str());
return ERR_DB_NOT_EXIST;
}
auto iter = observerMap_.find(key);
if (iter == observerMap_.end()) {
LOG_ERROR("FlatObjectStorageEngine::UnRegisterObserver observer not exist.");
return ERR_NO_OBSERVER;
}
auto delegate = delegates_.at(key);
std::shared_ptr<TableWatcher> watcher = iter->second;
LOG_INFO("start UnRegisterObserver %{public}s", key.c_str());
DistributedDB::DBStatus status = delegate->UnRegisterObserver(watcher.get());
if (status != DistributedDB::DBStatus::OK) {
LOG_ERROR("FlatObjectStorageEngine::UnRegisterObserver unRegister err %{public}d", status);
return ERR_UNRIGSTER;
}
LOG_INFO("end UnRegisterObserver %{public}s", key.c_str());
observerMap_.erase(key);
return SUCCESS;
}
```
JSDistributedObjectStore::JSDeleteCallback。
使用napi解析获取传过来的参数type,objectId如果参数为两个调用 DelCallback(env, g_changeCallBacks, objectId) 进行注册的回调函数的删除,如果有第三个参数调用 DelCallback(env, g_changeCallBacks, objectId, argv[2]) 进行注册的回调函数的删除。
对于函数DelCallback如果第三个参数为nullptr则清空此ID的所有注册的回调函数。
否则删除对应注册的回调函数。如果是最后一个则把这个ID对象也删除了。
```c++
void JSDistributedObjectStore::DelCallback(napi_env env, std::map<std::string, std::list<napi_ref>> &callbacks,
const std::string &sessionId, napi_value callback)
{
LOG_INFO("del callback %{public}s", sessionId.c_str());
napi_status status;
if (callback == nullptr) {
if (callbacks.count(sessionId) != 0) {
for (auto ref : callbacks.at(sessionId)) {
status = napi_delete_reference(env, ref);
CHECK_EQUAL_WITH_RETURN_VOID(status, napi_ok);
}
callbacks.erase(sessionId);
}
return;
}
napi_value callbackTmp;
if (callbacks.count(sessionId) != 0) {
auto lists = callbacks.at(sessionId);
for (auto iter = lists.begin(); iter != lists.end();) {
status = napi_get_reference_value(env, *iter, &callbackTmp);
CHECK_EQUAL_WITH_RETURN_VOID(status, napi_ok);
bool isEquals = false;
napi_strict_equals(env, callbackTmp, callback, &isEquals);
if (isEquals) {
napi_delete_reference(env, *iter);
iter = lists.erase(iter);
} else {
iter++;
}
}
if (lists.empty()) {
callbacks.erase(sessionId);
} else {
callbacks.insert_or_assign(sessionId, lists);
}
}
}
```
```javascript
statusCallback : function (sessionId, networkid, status) {
this.response += "status changed " + sessionId + " " + status + " " + networkId;
}
g_object.on("status", this.changeCallback);
```
statusCallback : function (sessionId, networkid, status) {
this.response += "status changed " + sessionId + " " + status + " " + networkId;
}
g_object.on("status", this.changeCallback);
```
删除对象的上下线变更监听,type固定为’status’,callback为可选参数,不设置表示删除该对象所有上下线监听。
- offWatch(type, this.__proxy, callback)。
- distributedObject.off(type, obj, callback)调用JS接口C++ 部分函数,参数type为’status’,obj为数据对象,callback为回调函数。
- distributedObject.deleteCallback(type, this.__objectId, callback)调用JS接口C++ 部分函数,参数type为’status’,__objectId为数据对象的ID号,callback为回调函数。从全局回调map中删除回调函数,要是callback为空,则删除此ID号的全部回调函数。
JSDistributedObjectStore::JSOff。
使用napi解析获取传过来的参数type,使用napi_unwrap根据传入的参数获取之前绑定的wrapper对象。如果第三个参数为空,DeleteWatch参数handler为nullptr。
调用 JSWatcher::Off 进行删除。
参数handler为nullptr先调用 StatusEventListener::Clear(napi_env env) 删除所有回调。
参数handler不为nullptr调用 StatusEventListener::Del(napi_env env, napi_value handler) 删除对应的handler。
然后调用 NotifierImpl::GetInstance()->DelWatcher(sessionId_) 删除监控。
JSDistributedObjectStore::JSDeleteCallback。
使用napi解析获取传过来的参数type,objectId如果参数为两个调用 DelCallback(env, g_statusCallBacks, objectId) 进行注册的回调函数的删除,如果有第三个参数调用 DelCallback(env, g_statusCallBacks, objectId, argv[2]) 进行注册的回调函数的删除。
对于函数DelCallback如果第三个参数为nullptr则清空此ID的所有注册的回调函数。
否则删除对应注册的回调函数。如果是最后一个则把这个ID对象也删除了。
在 DistributedObject::setSessionId 接口中给对象属性都绑定了C++ 接口部分对应的接口函数。
在给属性赋值或修改值是会调用 JSDistributedObject::JSPut ,在获取属性的值时会调用 JSDistributedObject::JSGet 。
```c++
const char *distributedObjectName = "DistributedObject";
napi_property_descriptor distributedObjectDesc[] = {
DECLARE_NAPI_FUNCTION("put", JSDistributedObject::JSPut),
DECLARE_NAPI_FUNCTION("get", JSDistributedObject::JSGet),
};
napi_status status = napi_define_class(env, distributedObjectName, strlen(distributedObjectName),
JSDistributedObject::JSConstructor, nullptr, sizeof(distributedObjectDesc) / sizeof(distributedObjectDesc[0]),
distributedObjectDesc, &distributedObjectClass);
CHECK_EQUAL_WITH_RETURN_NULL(status, napi_ok);
if (g_instance == nullptr) {
g_instance = new napi_ref;
status = napi_create_reference(env, distributedObjectClass, 1, g_instance);
CHECK_EQUAL_WITH_RETURN_NULL(status, napi_ok);
}
```
```javascript
g_object.name = "jack";
g_object.age = 19;
g_object.isVis = false;
g_object.parent = {mother:"jack mom",father:"jack Dad"};
g_object.list = [{mother:"jack mom"}, {father:"jack Dad"}];
// 对端设备收到change回调,fields为name,age,isVis,parent和list
```
g_object.name = "jack";
g_object.age = 19;
g_object.isVis = false;
g_object.parent = {mother:"jack mom",father:"jack Dad"};
g_object.list = [{mother:"jack mom"}, {father:"jack Dad"}];
// 对端设备收到change回调,fields为name,age,isVis,parent和list
```

```javascript
console.info("name " + g_object["name"]); //访问到的是组网内最新数据
```
console.info("name " + g_object["name"]); //访问到的是组网内最新数据
```

本文使用OpenHarmony 3.1版本的分布式数据对象模块源码,对模块提供的JS接口进行了源码级的介绍,对主要JS接口功能的实现流程也进行深入的介绍。分布式数据对象服务依赖于分布式数据管理,分布式软总线,系统能力管理等模块,感兴趣的读者可以研究下其他的模块的原理,可以更加方便的理解分布式数据对象。
本文一些JS例子代码来源于分布式对象用户手册。