diff --git a/reconf/config/device-groups/lights.json b/reconf/config/device-groups/lights.json new file mode 100644 index 0000000..5e6d581 --- /dev/null +++ b/reconf/config/device-groups/lights.json @@ -0,0 +1,6 @@ +[ + { + "name": "lights", + "types": ["ceiling-lamp", "light-stip"] + } +] \ No newline at end of file diff --git a/reconf/config/device-groups/switches.json b/reconf/config/device-groups/switches.json new file mode 100644 index 0000000..39fdc8d --- /dev/null +++ b/reconf/config/device-groups/switches.json @@ -0,0 +1,6 @@ +[ + { + "name": "switches", + "types": ["wall-switch", "remote-switch"] + } +] \ No newline at end of file diff --git a/reconf/config/serviceconf/bedroomLamp.json b/reconf/config/serviceconf/bedroomLamp.json new file mode 100644 index 0000000..743d622 --- /dev/null +++ b/reconf/config/serviceconf/bedroomLamp.json @@ -0,0 +1,9 @@ +[{ + "type": "ceiling-lamp-switch", + "uuid": "bed1", + "mainDevices": ["bri", "fra"], + "replacementDevices": "", + "room": "bedroom", + "configMsg": "", + "online": true +}] \ No newline at end of file diff --git a/reconf/config/serviceconf/bedroomSwitches.json b/reconf/config/serviceconf/bedroomSwitches.json new file mode 100644 index 0000000..4c0c00a --- /dev/null +++ b/reconf/config/serviceconf/bedroomSwitches.json @@ -0,0 +1,27 @@ +[{ + "type": "wall-switch", + "uuid": "bri", + "mainDevices": [], + "replacementDevices": "switches", + "room": "bedroom", + "configMsg": "", + "online": true +}, +{ + "type": "remote-switch", + "uuid": "fra", + "mainDevices": [], + "replacementDevices": "switches", + "room": "bedroom", + "configMsg": "", + "online": true +}, +{ + "type": "remote-switch", + "uuid": "fra2", + "mainDevices": [], + "replacementDevices": "switches", + "room": "bedroom", + "configMsg": "", + "online": true +}] \ No newline at end of file diff --git a/reconf/config/services/ceiling-lamp.json b/reconf/config/services/ceiling-lamp-switch.json similarity index 100% rename from reconf/config/services/ceiling-lamp.json rename to reconf/config/services/ceiling-lamp-switch.json diff --git a/reconf/config/services/remote-switch.json b/reconf/config/services/remote-switch.json new file mode 100644 index 0000000..fbd92ff --- /dev/null +++ b/reconf/config/services/remote-switch.json @@ -0,0 +1,4 @@ +{ + "image": "light-switch", + "args": [] +} \ No newline at end of file diff --git a/reconf/config/services/wall-switch.json b/reconf/config/services/wall-switch.json new file mode 100644 index 0000000..fbd92ff --- /dev/null +++ b/reconf/config/services/wall-switch.json @@ -0,0 +1,4 @@ +{ + "image": "light-switch", + "args": [] +} \ No newline at end of file diff --git a/reconf/src/PersistentStorage.ts b/reconf/src/PersistentStorage.ts new file mode 100644 index 0000000..a676c47 --- /dev/null +++ b/reconf/src/PersistentStorage.ts @@ -0,0 +1,24 @@ +import { ServiceEntry } from "./db"; + +/** The service UUID. */ +export type Uuid = string; +export type Room = string; + +/** The string describing the type of a service. */ +export type ServiceType = string; + +/** The array of selected service type and instances. */ +export type ServiceSelection = [ServiceType, ServiceEntry[]][]; +/** Options for specifying which changes need to be made in the database. */ +export type ConfigUpdates = { + del: Uuid[]; +}; + +export interface PersistentStorage { + createService(service: ServiceEntry): void; + updateService(service: ServiceEntry): Promise; + removeService(uuid: Uuid): Promise; + queryServices(): Promise; + queryService(uuid: Uuid): Promise; + queryServicesInRoom(room: Room): Promise; +}; \ No newline at end of file diff --git a/reconf/src/app.ts b/reconf/src/app.ts index b6911ea..6b4e3d6 100644 --- a/reconf/src/app.ts +++ b/reconf/src/app.ts @@ -3,6 +3,7 @@ import 'source-map-support/register'; import { env as getEnv, util } from 'horme-common'; import fail from './fail'; import srv from './service'; +import db, { initMissingServices } from './db'; import { resetDatabase } from './neo4j'; const env = getEnv.readEnvironment('reconf'); @@ -22,7 +23,7 @@ main().catch((err) => util.abort(err)); async function main() { logger.setLogLevel(env.logLevel); await resetDatabase(); + await db.initializeDatabase(); await fail.setupFailureListener(); - await srv.configureServices(); logger.info('initial configuration instantiated, listening...'); } diff --git a/reconf/src/db.ts b/reconf/src/db.ts index 34aa3d1..a0c910b 100644 --- a/reconf/src/db.ts +++ b/reconf/src/db.ts @@ -1,96 +1,274 @@ -import { addConfigToDB} from './neo4j'; -import { ServiceType, Uuid } from './service'; +import { returnQuery } from './neo4j'; +import { setDependencies, ServiceType, Uuid } from './service'; +import { ServiceConfig, parseAs, util } from 'horme-common'; +import path from 'path'; +import { QueryResult } from 'neo4j-driver'; +import { createService } from './service'; -export default { queryServiceSelection }; +export default { initializeDatabase }; + +const logger = util.logger; /** The array of selected service type and instances. */ export type ServiceSelection = [ServiceType, ServiceEntry[]][]; + +export type DeviceGroup = { + name: string; + types: [string]; +}; + /** Options for specifying which changes need to be made in the database. */ export type ConfigUpdates = { del: Uuid[]; }; /** The description of an un-instantiated service and its dependencies. */ export type ServiceEntry = { - uuid: Uuid; - type: ServiceType; - room: string | null; - depends: Uuid[]; + type: string; + uuid: string; + mainDevices: [string]; + replacementDevices: [string]; + room: string; + configMsg: string; + online: boolean; }; /********** implementation ************************************************************************/ -async function queryServiceSelection(updates?: ConfigUpdates): Promise { - if (updates) { - if (updateCount === 0) { - console.assert(updates.del[0] === 'fra'); - config.set('light-switch', [bedroomSwitch1]); - bedroomLamp.depends = [bedroomSwitch1.uuid]; - failureReasoner.depends = [bedroomSwitch1.uuid]; - updateCount = 1; - } else if (updateCount === 1) { - // console.assert(updates.del[0] === 'bri'); - // config.set('camera-motion-detect', [camera]); - // bedroomLamp.depends = []; - // failureReasoner.depends = []; - // updateCount = 2; - } else { - throw new Error('exceeded bounds of static reconfiguration scenario'); - } +async function initializeDatabase() { + logger.info('Import external Services...'); + await importServices(); + logger.info('Import external Device Groups...'); + await importDeviceGroups(); + logger.info('Import external MainDevices...'); + await initMissingServices(); +} + +//TODO: check for redundant uuides +async function importServices() { + const serviceFolder = './config/serviceconf/'; + const fs = require('fs'); + const files = await fs.readdirSync(serviceFolder); + + for (let file of files) { + let fullPath = path.join(serviceFolder, file); + let config: Array = JSON.parse(fs.readFileSync(fullPath.toString(), 'utf8')); + for (const x of config) { + addService(x); + }; + }; +}; + +export async function getSEfromUuid(uuid: string): Promise { + const a: string = 'MATCH (n: Service { uuid: \'' + uuid + '\'}) RETURN n.type, n.uuid, n.mainDevices, n.replacementDevices, n.room, n.configMsg, n.online'; + const query = await returnQuery(a); + if (query.records.length != 0) { + let x = query.records[0]; + const entry: ServiceEntry = { + type: x.get('n.type'), + uuid: x.get('n.uuid'), + mainDevices: x.get('n.mainDevices'), + replacementDevices: x.get('n.replacementDevices'), + room: x.get('n.room'), + configMsg: x.get('n.configMsg'), + online: x.get('n.online'), + }; + return entry; } + return undefined; +} + +//TODO: currently always first replacement devices from type t is used +//BUG: When searching a replacement device for a device which is offline and the device-group of the missing device-type is the same as a later main device, +// - the replacement device could 'steal' the device of a later main-device, which then also needs a replacement device, as the main device is already in use. + +async function alternativeConfiguration(dev:string, to:string) { + logger.info('searching alternative for device \'' + dev + '\'!'); + + //get all importent attributes from dev + const repldev: string = 'MATCH (n: Service) WHERE n.uuid = \'' + dev + '\' RETURN n.replacementDevices, n.room'; + let realdev = await returnQuery(repldev); + + let devGroup = realdev.records[0].get('n.replacementDevices'); + let room = realdev.records[0].get('n.room'); + + //get all types from group + const grouprq: string = 'MATCH (n: DeviceGroup: ' + devGroup + ') RETURN n.devices'; + + let groupres = await returnQuery(grouprq); - await addConfigToDB(Array.from(config)); + //get device types (sorted by prio) + var splitted = groupres.records[0].get('n.devices').split(',', 30); + for (let type of splitted) { + type = type.split('-').join('_'); + + //for each replacement type, search for devices in the current room + const e: string = 'MATCH (n: Service: ' + type + '), ( m: Service ) WHERE n.online = \'true\' AND m.uuid = \'' + to + '\' AND n.room = \'' + room + '\' AND NOT (n)-[:SUBSCRIBE]->(m) RETURN n.uuid'; + let back = await returnQuery(e); + if(back.records.length != 0){ + let newuuid = back.records[0].get('n.uuid'); + logger.debug('Found replacement device for \'' + dev + '\' with name \'' + newuuid + '\''); + return newuuid; + } + } + return null; - return Array.from(config); } -export async function queryService(uuid: string): Promise { - for (let [_, value] of config) { - for (let entry of value) { - if (entry.uuid === uuid) return entry; - } +//remove service from db +export async function removeService(uuid: string): Promise { + const a: string = 'MATCH (n: Service { uuid: \'' + uuid + '\' }) RETURN n'; + let back = await returnQuery(a); + if(back.records.length != 0){ + logger.info('Removing service with uuid \'' + uuid + '\'!'); + + //DETACH implies that all relations are deleted too + const removeQuery: string = 'MATCH (n: Service { uuid: \'' + uuid + '\' }) DETACH DELETE n'; + back = await returnQuery(removeQuery); } - return undefined; } -const bedroomSwitch1: ServiceEntry = { - uuid: 'bri', - room: 'bedroom', - type: 'light-switch', - depends: [], -}; +//set service offline +export async function disableService(uuid: string): Promise { -const bedroomSwitch2: ServiceEntry = { - uuid: 'fra', - room: 'bedroom', - type: 'light-switch', - depends: [], -}; + //check if service is available + const a: string = 'MATCH (n: Service { uuid: \'' + uuid + '\' }) RETURN n'; + let back = await returnQuery(a); + if(back.records.length != 0){ + logger.info('Set service with uuid \'' + uuid + '\' offline!'); -const bedroomLamp: ServiceEntry = { - uuid: 'abc', - room: 'bedroom', - type: 'ceiling-lamp', - depends: [bedroomSwitch1.uuid, bedroomSwitch2.uuid], -}; + //DETACH implies that all relations are deleted too + const removeQuery: string = 'MATCH (n: Service { uuid: \'' + uuid + '\' }) SET n.online = \'false\''; + back = await returnQuery(removeQuery); + } +} -const camera: ServiceEntry = { - uuid: 'cam', - room: 'bedroom', - type: 'camera-motion-detect', - depends: [], -}; -const failureReasoner: ServiceEntry = { - uuid: 'flr', - room: null, - type: 'failure-reasoner', - depends: [bedroomSwitch1.uuid, bedroomSwitch2.uuid], -}; +//should be called when new devices are added to net network/switched state to online again +export async function initMissingServices() { + // Search for devices which are not configured + const a: string = 'MATCH (n: Service) WHERE n.configured = \'false\' AND NOT n.mainDevices = \'\' RETURN n.mainDevices, n.uuid'; + const res = await returnQuery(a); + + //configure them + configureServices(res); +} + + +//create configurations for services +async function configureServices(res: QueryResult) { + if (res.records.length != 0) { + //iterate over all devices with main devices + for(const record of res.records) { + //iterate over all searched main devices + let md = record.get('n.mainDevices'); + let uuid = record.get('n.uuid'); + var splitted = md.split(',', 30); + for (let singlemd of splitted) { + + //iterate over all maindevices + const d: string = 'MATCH (n: Service) WHERE n.uuid = \'' + singlemd + '\' RETURN n.uuid, n.replacementDevices'; + const res1 = await returnQuery(d); + if (res1.records.length == 0) { + + //if the device does not exists, abort + logger.error('Device with uuid \'' + singlemd + '\' does not exist in the database!'); + return; + } + + // check if the desired device is available + const e: string = 'MATCH (n: Service), (m: Service) WHERE n.online = \'true\' AND n.uuid = \'' + singlemd + '\' AND m.uuid = \'' + uuid + '\' AND NOT (n)-[:SUBSCRIBE]->(m) RETURN n.uuid'; + const res2 = await returnQuery(e); + if (res2.records.length == 0) { + logger.warn('Device with uuid \'' + singlemd + '\' is not available for this configuration!'); + + //desired device is not available,search for an alternative + let alt = await alternativeConfiguration(singlemd, uuid); + if(alt) { + initRelationship(uuid, alt); + } else { + logger.info('No replacement device found for \'' + singlemd + '\''); + return; + } + } else { + // desired device is available, add relationship + initRelationship(uuid, singlemd); + } + setDependencies(singlemd); + } + setDependencies(uuid); + const finished: string = 'MATCH (n: Service { uuid: \'' + uuid + '\' }) SET n.configured = \'true\''; + await returnQuery(finished); + + }; + + } else { + logger.error('Nothing to configure. Check your Queries!'); + } +} + +//Adding a Subscribe relation between devices with uuid dev1 and dev2. +async function initRelationship(dev1:string, dev2:string) { + logger.info('Adding relation from \"' + dev1 + '\" to \"' + dev2 + '\".'); + const e: string = 'MATCH (n: Service {uuid: \'' + dev1 + '\'}), (m: Service {uuid: \'' + dev2 + '\'}) CREATE (m)-[r:SUBSCRIBE]->(n)'; + await returnQuery(e); + return; +} -const config: Map = new Map([ - ['ceiling-lamp', [bedroomLamp]], - ['light-switch', [bedroomSwitch1, bedroomSwitch2]], - //['failure-reasoner', [failureReasoner]] -]); +//read device types from json +async function importDeviceGroups() { + const deviceGroupsFolder = './config/device-groups/'; + const fs = require('fs'); + const files = await fs.readdirSync(deviceGroupsFolder); + + for (let file of files) { + let fullPath = path.join(deviceGroupsFolder, file); + let config: Array = JSON.parse(fs.readFileSync(fullPath.toString(), 'utf8')); + for (const x of config) { + + //Walkaround for illegal '-' in typename + let name = x.name; + name = name.split('-').join('_'); + + // Add device group to DB + const a: string = 'MATCH (n: DeviceGroup:' + name + ' { devices: \'' + x.types + '\' }) RETURN n'; + let res = await returnQuery(a); + if (res.records.length == 0) { + const b: string = 'CREATE (n: DeviceGroup:' + name + ' { devices: \'' + x.types + '\' })'; + await returnQuery(b); + } + }; + }; +} + +//create service based on a service entry +export async function addService(x: ServiceEntry){ + + const fs = require('fs'); + + //Walkaround for illegal '-' in typename + let type = x.type; + let newworld = type.split('-').join('_'); + + // If Device does not exist, add it to DB + const a: string = 'MATCH (n: Service:' + newworld + ' { uuid: \'' + x.uuid + '\', mainDevices: \'' + x.mainDevices + '\', type: \'' + type + '\', replacementDevices: \'' + x.replacementDevices + '\', online: \'' + x.online + '\', room: \'' + x.room + '\' }) RETURN n'; + + const query = await returnQuery(a); + if (query.records.length == 0) { + + //check if service can be configured (has main devices) + if (x.mainDevices.length > 0) { + const b: string = 'CREATE (n: Service:' + newworld + ' { uuid: \'' + x.uuid + '\', mainDevices: \'' + x.mainDevices + '\', type: \'' + type + '\', replacementDevices: \'' + x.replacementDevices + '\', online: \'' + x.online + '\', room: \'' + x.room + '\', configured: \'false\' })'; + await returnQuery(b); + } else { + const b: string = 'CREATE (n: Service:' + newworld + ' { uuid: \'' + x.uuid + '\', mainDevices: \'' + x.mainDevices + '\', type: \'' + type + '\', replacementDevices: \'' + x.replacementDevices + '\', online: \'' + x.online + '\', room: \'' + x.room + '\', configured: \'true\' })'; + await returnQuery(b); + } + const file = await fs.readFileSync(`./config/services/${type}.json`, 'utf8'); + const back = await parseAs(ServiceConfig, JSON.parse(file.toString())); + if (!back) { + return; + } + await createService(x, back); + } +} -let updateCount = 0; diff --git a/reconf/src/db/PersistentStorageQueries.ts b/reconf/src/db/PersistentStorageQueries.ts new file mode 100644 index 0000000..edc132d --- /dev/null +++ b/reconf/src/db/PersistentStorageQueries.ts @@ -0,0 +1,93 @@ + +import { addService, removeService, ServiceEntry } from "../db"; +import { returnQuery } from "../neo4j"; +import { PersistentStorage } from "../PersistentStorage"; +import { util } from 'horme-common'; + +const logger = util.logger; + +//This class is used as an api for communication with the db + +export class PseudoPersistentStorage implements PersistentStorage { + + //Adds ServiceEntry to db and instantiateService + //TODO: Add return for successful instantiation + async createService(service: ServiceEntry) { + addService(service); + } + async updateService(service: ServiceEntry): Promise { + throw new Error("Method not implemented."); + } + + //Removes service from DB + async removeService(uuid: string): Promise { + removeService(uuid); + } + + //returns all existing service in the database as ServiceEntity + async queryServices(): Promise { + const entries: ServiceEntry[] = []; + const a: string = 'MATCH (n: Service) RETURN n'; + const query = await returnQuery(a); + if (query.records.length != 0) { + for (let x of query.records) { + const entry: ServiceEntry = { + type: x.get('n.type'), + uuid: x.get('n.uuid'), + mainDevices: x.get('n.mainDevices'), + replacementDevices: x.get('n.replacementDevices'), + room: x.get('n.room'), + configMsg: x.get('n.configMsg'), + online: x.get('n.online'), + }; + entries.push(entry); + } + + } + return entries; + } + + //returns service entry of service with uuid + async queryService(uuid: string): Promise { + const a: string = 'MATCH (n: Service { uuid: \'' + uuid + '\'}) RETURN n'; + const query = await returnQuery(a); + if (query.records.length != 0) { + if (query.records.length > 0) { + logger.error('possible redundancies found') + } + const entry: ServiceEntry = { + type: query.records[0].get('n.type'), + uuid: query.records[0].get('n.uuid'), + mainDevices: query.records[0].get('n.mainDevices'), + replacementDevices: query.records[0].get('n.replacementDevices'), + room: query.records[0].get('n.room'), + configMsg: query.records[0].get('n.configMsg'), + online: query.records[0].get('n.online'), + }; + return entry; + + } + return undefined; + } + + //get all service entries from room + async queryServicesInRoom(room: string): Promise { + const entries: ServiceEntry[] = []; + const a: string = 'MATCH (n: Service: { room: \'' + room + '\'}) RETURN n'; + const query = await returnQuery(a); + for (let x of query.records) { + const entry: ServiceEntry = { + type: x.get('n.type'), + uuid: x.get('n.uuid'), + mainDevices: x.get('n.mainDevices'), + replacementDevices: x.get('n.replacementDevices'), + room: x.get('n.room'), + configMsg: x.get('n.configMsg'), + online: x.get('n.online'), + }; + entries.push(entry); + + } + return entries; + } +}; diff --git a/reconf/src/failure/DefaultHandler.ts b/reconf/src/failure/DefaultHandler.ts index 30f9b9f..54c3d70 100644 --- a/reconf/src/failure/DefaultHandler.ts +++ b/reconf/src/failure/DefaultHandler.ts @@ -7,7 +7,8 @@ const logger = util.logger; export default class DefaultHandler implements FailureHandler { async handle(msg: { uuid: string; reason: string; }): Promise { logger.debug(`Default failure handler for ${msg.uuid} caused by ${msg.reason}`); - await service.stopService(msg.uuid); - await service.startService(msg.uuid); + await service.removeService(msg.uuid); + //await service.stopService(msg.uuid); + //await service.startService(msg.uuid); } } \ No newline at end of file diff --git a/reconf/src/neo4j.ts b/reconf/src/neo4j.ts index 72397eb..2d4d50e 100644 --- a/reconf/src/neo4j.ts +++ b/reconf/src/neo4j.ts @@ -1,6 +1,8 @@ import neo4j, { Driver } from 'neo4j-driver'; -import { env as getEnv, util } from 'horme-common'; +import { env as getEnv, util, DeviceMessage } from 'horme-common'; import { ServiceEntry } from './db'; +import { QueryResult } from 'neo4j-driver/types/result'; +import mqtt from 'async-mqtt'; const env = getEnv.readEnvironment('reconf'); const logger = util.logger; @@ -25,12 +27,12 @@ export async function connectNeo4j() { if (await finish) { driver = d; logger.info('Connected to Neo4j!'); + initDataListener(); return; } } catch (error) {} } - } //reset whole database @@ -46,20 +48,13 @@ export async function resetDatabase(): Promise{ } //execute query with return -export async function returnQuery(n :string): Promise { +export async function returnQuery(n :string): Promise { if(driver === undefined) { await connectNeo4j(); } const session = driver.session(); - let entireResult = ''; - await session.run(n).then(result => { - return result.records.map(record => { // Iterate through records - entireResult = record.get('n'); // Access the name property from the RETURN statement - }); - }) - .then(() => { - session.close();}); - return entireResult; + let result = await session.run(n); + return result; } //add all dependencies from services to other services @@ -70,29 +65,35 @@ async function updateAllDependencies(config: [string, ServiceEntry[]][]) { //Reset all current dependencies, as device dependencies may change during reconfiguration await resetAllDependencies(); +} - for (const element of config) { - for (const elem2 of element[1]) { - for (const deps of elem2.depends) { - - //if dependency dev exists - const dev: string = 'MATCH (n) WHERE n.uuid = \'' + deps + '\' RETURN n'; - const res = await returnQuery(dev); - if (res != '') { - - //check if relation already exists - const checkrel: string = 'MATCH (n)-[DEPENDS_ON]->(m) WHERE n.uuid = \'' + elem2.uuid + '\' AND m.uuid = \'' + deps + '\' RETURN n'; - const result = await returnQuery(checkrel); - if (result == '') { - - //create relation - const newrel: string = 'MATCH (n), (m) WHERE n.uuid = \'' + elem2.uuid + '\' AND m.uuid = \'' + deps + '\' CREATE (n)-[r:DEPENDS_ON]->(m)'; - await returnQuery(newrel); - } - } - } - } - } +export async function initDataListener() { + logger.info('Setup Database Listener...'); + // connect MQTT client + const client = await mqtt.connectAsync(env.host, env.auth); + // set MQTT client message event listener + client.on('message', (topic, msg) => { + updateState(topic, msg.toString()); + }); + await client.subscribe([ + `data/${process.env.HORME_APARTMENT}/bedroom/#`, + ]); +} + +export async function updateState(topic: string, message: string) { + + //search for id + var splitted = topic.split('_', 2); + var id = splitted[1]; + + //parse message to devicemsg + //extract value from devicemsg + const deviceMsg = DeviceMessage.check(JSON.parse(message)); + var val = deviceMsg.value; + + //set value in db + const finished: string = 'MATCH (n: Service { uuid: \'' + id + '\' }) SET n.state = \'' + val+ '\''; + await returnQuery(finished); } //Reset all current dependencies @@ -102,7 +103,7 @@ async function resetAllDependencies() { } const session = driver.session(); logger.info('Reset all Depends_Of relations...'); - await session.run('MATCH ()-[r:DEPENDS_ON]-() DELETE r') + await session.run('MATCH ()-[r:SUBSCRIBE]-() DELETE r') .then(() => { session.close();}); return; @@ -118,28 +119,6 @@ export async function addConfigToDB(config: [string, ServiceEntry[]][]): Promise //Walkaround for illegal '-' in typename let type = elem2.type; type = type.split('-').join('_'); - - //Check if Service does exist - const a: string = 'MATCH (n:' + type + ' { uuid: \'' + elem2.uuid + '\' }) RETURN n'; - if (await returnQuery(a) == '') { - const b: string = 'CREATE (n:' + type + ' { uuid: \'' + elem2.uuid + '\'})'; - await returnQuery(b); - - //check if Room exist - if(elem2.room) { - const room: string = 'MATCH (n:Room { name: \'' + elem2.room + '\'}) RETURN n'; - const me = await returnQuery(room); - if (me == '') { - const newroom: string = 'CREATE (n:Room { name: \'' + elem2.room + '\'})'; - await returnQuery(newroom); - } - - //set service and room in ralationship - const newroom: string = 'MATCH (n:Room), (m:' + type + ') WHERE n.name = \'' + elem2.room + '\' AND m.uuid = \'' + elem2.uuid + '\' CREATE (m)-[r:BELONGS_TO]->(n)'; - await returnQuery(newroom); - } - - } } } diff --git a/reconf/src/service.ts b/reconf/src/service.ts index c3c2647..44d2355 100644 --- a/reconf/src/service.ts +++ b/reconf/src/service.ts @@ -4,7 +4,7 @@ import fs from 'fs/promises'; import chalk from 'chalk'; import mqtt from 'async-mqtt'; -import db, { queryService, ServiceEntry, ServiceSelection } from './db'; +import db, { getSEfromUuid, ServiceEntry, removeService, initMissingServices, disableService } from './db'; import { env as getEnv, util, @@ -15,8 +15,9 @@ import { parseAs, } from 'horme-common'; import ServiceFactory from './service/ServiceFactory'; +import { returnQuery } from './neo4j'; -export default { cleanUp, configureServices, removeService, startService, stopService }; +export default { cleanUp, configureService: setDependencies, removeService: _removeService, stopService, startService }; /** The service UUID. */ export type Uuid = string; @@ -42,60 +43,17 @@ const services: Map = new Map(); const factory = new ServiceFactory(); const serviceNamePrefix = 'horme-'; -/** Instantiates and configures the set of services selected from the database. */ -async function configureServices(): Promise { - // query current service selection from database - const result = await db.queryServiceSelection(); - // instantiate all not yet instantiated services, insert them into global map - const instantiated = await instantiateServices(result); - // set and configure all service dependencies - await Promise.all(instantiated.map((args) => configureService(...args, true))); -} - -/** Removes the service with the given `uuid` and triggers a full service selection - * and configuration update. */ -async function removeService(uuid: string): Promise { - // retrieve updated service selection from database - const reconfiguration = await db.queryServiceSelection({ del: [uuid] }); - - const previousServices = Array.from(services.values()); - const newServices = Array.from( - reconfiguration.flatMap(([_, instances]) => { - return instances.map((instance) => instance.uuid); - }) - ); - - // determine services which are no longer present in updated service selection - const removals = previousServices.filter((prev) => !newServices.includes(prev.info.uuid)); - - // remove all services no longer present in the new configuration and kill their respective - // processes - for (const service of removals) { - logger.warn('killing process of service ' + chalk.underline(service.info.uuid)); - - stopService(service.info.uuid); - services.delete(service.info.uuid); - } - - // instantiate all new services - const instantiatedServices = await instantiateServices(reconfiguration); - - // configure all newly instantiated services and re-configure all changed services - logger.info('initiating service reconfiguration...'); - await Promise.all(instantiatedServices.map((args) => configureService(...args))); -} - -async function startService(uuid: string) { - const entry = await queryService(uuid); +export async function startService(uuid: string) { + const entry = await getSEfromUuid(uuid); if (!entry) return; const config = await readConfig(entry.type); if (!config) return; - logger.debug(`Start service ${uuid}`) + logger.debug(`Start service ${uuid}`); const handle = getServiceHandle(entry); handle.info.version++; // TODO: not here.. - configureService(handle, entry.depends); + const proc = _startService(entry, config, buildTopic(entry)); handle.proc = proc; updateServiceHandle(handle); @@ -112,34 +70,16 @@ function cleanUp(): void { execSync(`docker ps -a -q -f "name=${serviceNamePrefix}" | xargs -I {} docker rm {} `); } -/** Instantiates all (not yet instantiated) services in the given `selection`. */ -async function instantiateServices( - selection: ServiceSelection -): Promise<[ServiceHandle, Uuid[]][]> { - const promises = await Promise.all( - selection.map(async ([type, selected]) => { - const config = await readConfig(type); - if (!config) return []; - return await Promise.all( - Array.from(selected.map((sel) => instantiateService(sel, config))) - ); - }) - ); - - return promises.flat(); -} - async function readConfig(type: ServiceType): Promise { const file = await fs.readFile(`./config/services/${type}.json`); return parseAs(ServiceConfig, JSON.parse(file.toString())); } /** Instantiates a service of the given type/description/config if it does not already exist. */ -async function instantiateService( +export async function createService( entry: ServiceEntry, config: ServiceConfig -): Promise<[ServiceHandle, Uuid[]]> { - +): Promise<[ServiceHandle]> { const handle = services.get(entry.uuid); if (handle === undefined) { const topic = buildTopic(entry); @@ -148,15 +88,68 @@ async function instantiateService( handle.proc = proc; updateServiceHandle(handle); - return [handle, entry.depends]; + return [handle]; } else { updateServiceHandle(handle); - return [handle, entry.depends]; + return [handle]; + } +} + +/** Sets the dependencies of the corresponding service instance. */ +export async function setDependencies(uuid: string) { + const config: string = 'MATCH (m: Service { uuid: \'' + uuid +'\' }), ( n: Service ) WHERE (n)-[:SUBSCRIBE]->(m) RETURN n.uuid'; + let depen = await returnQuery(config); + let uuids: string[] = []; + for(let x of depen.records) { + uuids.push(x.get('n.uuid')); + } + let i = await getSEfromUuid(uuid); + if (typeof i !== 'undefined') { + let handle = getServiceHandle(i); + transferConfig(handle, uuids); + } else { + logger.error('No ServiceEntry found..'); } } +/** Removes the service with the given `uuid` and triggers a full service selection + * and configuration update. */ +async function _removeService(uuid: string): Promise { + + let se = await getSEfromUuid(uuid); + if (!se) return; + let toRemove = getServiceHandle(se); + + //search for all services that did get information from malfunctional device + const config: string = 'MATCH (n: Service { uuid: \'' + uuid +'\' }), ( m: Service ) WHERE (n)-[:SUBSCRIBE]->(m) RETURN m.uuid'; + let depen = await returnQuery(config); + + //retrieve updated service selection from database + await disableService(uuid); + + //set those services to unconfigured + for(let x of depen.records) { + const unconfservice: string = 'MATCH (n: Service)-[r:SUBSCRIBE]->(m: Service) WHERE m.uuid = \'' + x.get('m.uuid') + '\' DELETE r'; + await returnQuery(unconfservice); + + //delete all relations from them + const unconfigure: string = 'MATCH (n: Service { uuid: \'' + x.get('m.uuid') +'\', online: \'true\' }) SET n.configured = \'false\''; + await returnQuery(unconfigure); + } + + //reconfigure all these services + initMissingServices(); + + // remove all services no longer present in the new configuration and kill their respective + // processes + logger.warn('killing process of service ' + chalk.underline(toRemove.info.uuid)); + + stopService(toRemove.info.uuid); + services.delete(toRemove.info.uuid); +} + /** Sets the dependencies of the corresponding service instance. */ -async function configureService(service: ServiceHandle, depends: Uuid[], init = false) { +async function transferConfig(service: ServiceHandle, depends: Uuid[], init = false) { const { add, del } = setServiceDependencies(service, depends); const reconfigure = add.length && del.length; @@ -187,6 +180,10 @@ function _startService(entry: ServiceEntry, config: ServiceConfig, topic: string '-e', 'HORME_LOG_LEVEL=' + env.logLevel, '-e', + 'HORME_NEO4J_USER=' + process.env.HORME_MQTT_USER, + '-e', + 'HORME_NEO4J_PASS=' + process.env.HORME_MQTT_PASS, + '-e', 'HORME_MQTT_HOST=' + env.host, '-e', 'HORME_SERVICE_TOPIC=' + topic, @@ -292,5 +289,5 @@ export function buildTopic(entry: ServiceEntry): string { entry.room !== null ? `${process.env.HORME_APARTMENT}/${entry.room}` : `${process.env.HORME_APARTMENT}/global`; - return `${base}/${entry.type}${entry.uuid}`; + return `${base}/${entry.type}_${entry.uuid}`; } diff --git a/services/ceiling-lamp/src/app.ts b/services/ceiling-lamp/src/app.ts index cb5c0e7..5856e55 100644 --- a/services/ceiling-lamp/src/app.ts +++ b/services/ceiling-lamp/src/app.ts @@ -62,16 +62,13 @@ async function handleConfigMessage(client: AsyncMqttClient, topic: string, msg: logger.info('initial configuration received'); serviceInfo = config.info; } - const add = config.add.map(sub => 'data/' + sub.topic); const del = config.del.map(sub => 'data/' + sub.topic); - if (add.length > 0) { subCount += add.length; await client.subscribe(add); logger.debug('subscribed to topic(s): ' + add.join(', ')); } - if (del.length > 0) { subCount -= del.length; await client.subscribe(del); @@ -108,7 +105,6 @@ async function handleDataMessage( value: device.value, timestamp: new Date().getTime(), }; - await client.publish(sendTopic, JSON.stringify(response), { retain: true }); logger.debug(`retained state set to: '${device.value}'`) } \ No newline at end of file diff --git a/services/light-switch/src/app.ts b/services/light-switch/src/app.ts index a2c0947..6783291 100644 --- a/services/light-switch/src/app.ts +++ b/services/light-switch/src/app.ts @@ -46,8 +46,11 @@ async function main() { if (!isConfigured) { logger.info(`initial configuration received ${msg.info.version}`); if (msg.info.version === 0) { - // simulate a start error - process.exit(1); + if(msg.info.uuid === 'fra') { + // simulate a start error for fra + logger.info('INFO: simulated start error for ' + msg.info.uuid); + process.exit(1); + } } isConfigured = true; simulateSwitchActivity(