From 97f09c41a8dc7fceeaea6efb1a5c52fe10a7e0b5 Mon Sep 17 00:00:00 2001 From: lg Date: Fri, 25 Jun 2021 16:22:42 +0100 Subject: [PATCH 1/3] wip: support init from json config --- experimental/init_from_json.py | 9 +++++++++ srcs/cpp/include/kungfu/peer.hpp | 4 ++-- srcs/cpp/include/kungfu/python/c_api.h | 1 + srcs/cpp/src/peer.cpp | 9 +++++++++ srcs/cpp/src/python/init.cpp | 6 ++++++ srcs/go/kungfu/env/config_json.go | 21 +++++++++++++++++++++ srcs/go/kungfu/env/config_json_test.go | 1 + srcs/go/libkungfu-comm/main.go | 15 +++++++++++++++ srcs/python/kungfu/python/__init__.py | 24 +++++++++++++++++++----- 9 files changed, 83 insertions(+), 7 deletions(-) create mode 100644 experimental/init_from_json.py create mode 100644 srcs/go/kungfu/env/config_json.go create mode 100644 srcs/go/kungfu/env/config_json_test.go diff --git a/experimental/init_from_json.py b/experimental/init_from_json.py new file mode 100644 index 000000000..97c0f6aee --- /dev/null +++ b/experimental/init_from_json.py @@ -0,0 +1,9 @@ +import json + +from kungfu.python import init_from_config + +config = { + # 'workers' +} + +init_from_config(config) diff --git a/srcs/cpp/include/kungfu/peer.hpp b/srcs/cpp/include/kungfu/peer.hpp index b82e06f57..6f89d88ee 100644 --- a/srcs/cpp/include/kungfu/peer.hpp +++ b/srcs/cpp/include/kungfu/peer.hpp @@ -13,9 +13,9 @@ using TransformFunc = std::function(pJson)); + if (err) { + fprintf(stderr, "%s failed\n", "GoKungfuInitFromJSON"); + exit(1); + } +} + Peer::~Peer() { GoKungfuFinalize(); } bool Peer::Detached() const { return GoKungfuDetached(); } diff --git a/srcs/cpp/src/python/init.cpp b/srcs/cpp/src/python/init.cpp index f4c08472a..c1f377fce 100644 --- a/srcs/cpp/src/python/init.cpp +++ b/srcs/cpp/src/python/init.cpp @@ -39,6 +39,12 @@ void kungfu_python_init_single_machine(int rank, int size) _init_affinity(); } +extern void kungfu_python_init_from_json(const char *pJson) +{ + _default_peer.reset(new kungfu::Peer(pJson)); + _init_affinity(); +} + void kungfu_python_finialize() { _default_peer.reset(nullptr); } uint64_t kungfu_uid() { return _default_peer->Uid(); } diff --git a/srcs/go/kungfu/env/config_json.go b/srcs/go/kungfu/env/config_json.go new file mode 100644 index 000000000..bf37e9857 --- /dev/null +++ b/srcs/go/kungfu/env/config_json.go @@ -0,0 +1,21 @@ +package env + +import "encoding/json" + +type peer struct { +} + +type kungfuConfig struct { + Peers []peer `json:"peers"` +} + +func ParseConfigFromJSON(js string) (*Config, error) { + var kfConfig kungfuConfig + if err := json.Unmarshal([]byte(js), &kfConfig); err != nil { + return nil, err + } + return &Config{ + // InitPeers: initPeers, + // Strategy: *strategy, + }, nil +} diff --git a/srcs/go/kungfu/env/config_json_test.go b/srcs/go/kungfu/env/config_json_test.go new file mode 100644 index 000000000..13a09aeb7 --- /dev/null +++ b/srcs/go/kungfu/env/config_json_test.go @@ -0,0 +1 @@ +package env diff --git a/srcs/go/libkungfu-comm/main.go b/srcs/go/libkungfu-comm/main.go index b08e02d75..101a6672a 100644 --- a/srcs/go/libkungfu-comm/main.go +++ b/srcs/go/libkungfu-comm/main.go @@ -33,6 +33,21 @@ func GoKungfuInit() int { return errorCode("Start", defaultPeer.Start()) } +//export GoKungfuInitFromJSON +func GoKungfuInitFromJSON(pJSON *C.char) int { + js := C.GoString(pJSON) + log.Errorf("GoKungfuInitFromJSON: %s", js) + cfg, err := env.ParseConfigFromJSON(js) + if err != nil { + errorCode("ParseConfigFromJSON", err) + } + defaultPeer, err = peer.NewFromConfig(cfg) + if err != nil { + return errorCode("NewFromConfig", err) + } + return errorCode("Start", defaultPeer.Start()) +} + //export GoKungfuInitSingleMachine func GoKungfuInitSingleMachine(rank, size C.int) int { cfg, err := env.SingleMachineEnv(int(rank), int(size)) diff --git a/srcs/python/kungfu/python/__init__.py b/srcs/python/kungfu/python/__init__.py index 507f9d9e4..50c890013 100644 --- a/srcs/python/kungfu/python/__init__.py +++ b/srcs/python/kungfu/python/__init__.py @@ -1,5 +1,7 @@ import atexit +import json import os +from ctypes import byref, c_char, c_char_p, c_int from kungfu.loader import (_call_method, _call_method_with, _load_clib, _module_path) @@ -23,9 +25,23 @@ def _load_and_init_python_lib(): return _python_lib, has_nccl -_python_lib = None -_has_nccl = None -_python_lib, _has_nccl = _load_and_init_python_lib() +if os.getenv('KUNGFU_DISABLE_AUTO_LOAD'): + _python_lib = None + _has_nccl = None +else: + _python_lib, _has_nccl = _load_and_init_python_lib() + + +def init_from_config(config): + global _python_lib + global _has_nccl + _load_clib('libkungfu') + _python_lib = _load_clib('libkungfu_python') + js = json.dumps(config) + # https://stackoverflow.com/questions/61294630/ctypes-passing-a-·string-as-a-pointer-from-python-to-c + _call_method_with(_python_lib, 'kungfu_python_init_from_json', + c_char_p(js.encode())) + _has_nccl = _call_method(_python_lib, 'kungfu_python_init_nccl') def _init_single_machine_multiple_process(rank, size): @@ -129,8 +145,6 @@ def show_nccl_version(): # unstable APIs -from ctypes import byref, c_int, c_char - def _resize_from_url(): changed = c_char() From 1e13aa1ecfa992703eade56b6de9253b337abc7a Mon Sep 17 00:00:00 2001 From: lg Date: Sat, 26 Jun 2021 18:35:31 +0100 Subject: [PATCH 2/3] parse config --- experimental/init_from_json.py | 9 ------ experimental/init_from_json/main.py | 30 +++++++++++++++++++ experimental/init_from_json/run.sh | 13 +++++++++ srcs/go/kungfu/env/config_json.go | 45 ++++++++++++++++++++++++++--- 4 files changed, 84 insertions(+), 13 deletions(-) delete mode 100644 experimental/init_from_json.py create mode 100644 experimental/init_from_json/main.py create mode 100755 experimental/init_from_json/run.sh diff --git a/experimental/init_from_json.py b/experimental/init_from_json.py deleted file mode 100644 index 97c0f6aee..000000000 --- a/experimental/init_from_json.py +++ /dev/null @@ -1,9 +0,0 @@ -import json - -from kungfu.python import init_from_config - -config = { - # 'workers' -} - -init_from_config(config) diff --git a/experimental/init_from_json/main.py b/experimental/init_from_json/main.py new file mode 100644 index 000000000..354a7328e --- /dev/null +++ b/experimental/init_from_json/main.py @@ -0,0 +1,30 @@ +import argparse + +from kungfu.python import init_from_config + + +def parse_args(): + p = argparse.ArgumentParser() + p.add_argument('--index', type=int, default=0) + return p.parse_args() + + +def main(): + args = parse_args() + config = { + 'cluster': { + 'worker': [ + '127.0.0.1:10010', + '127.0.0.1:10011', + ], + }, + 'task': { + 'index': args.index, + }, + } + + init_from_config(config) + print('done') + + +main() diff --git a/experimental/init_from_json/run.sh b/experimental/init_from_json/run.sh new file mode 100755 index 000000000..16c2b5919 --- /dev/null +++ b/experimental/init_from_json/run.sh @@ -0,0 +1,13 @@ +#!/bin/sh +set -e + +cd $(dirname $0) + +export KUNGFU_DISABLE_AUTO_LOAD=1 + +python3 main.py --index 0 & +python3 main.py --index 1 & + +wait + +echo done diff --git a/srcs/go/kungfu/env/config_json.go b/srcs/go/kungfu/env/config_json.go index bf37e9857..73418c684 100644 --- a/srcs/go/kungfu/env/config_json.go +++ b/srcs/go/kungfu/env/config_json.go @@ -1,12 +1,45 @@ package env -import "encoding/json" +import ( + "encoding/json" + "net" + "strconv" -type peer struct { + "github.com/lsds/KungFu/srcs/go/plan" +) + +type taskID plan.PeerID + +func (t taskID) MarshalJSON() ([]byte, error) { + port := strconv.Itoa(int(t.Port)) + addr := net.JoinHostPort(plan.FormatIPv4(t.IPv4), port) + return json.Marshal(addr) +} + +func (t *taskID) UnmarshalJSON(bs []byte) error { + var s string + if err := json.Unmarshal(bs, &s); err != nil { + return err + } + id, err := plan.ParsePeerID(s) + if err != nil { + return err + } + *t = taskID(*id) + return nil +} + +type clusterSpec struct { + Worker []taskID `json:"worker"` +} + +type taskSpec struct { + Index int `json:"index"` } type kungfuConfig struct { - Peers []peer `json:"peers"` + Cluster clusterSpec `json:"cluster"` + Task taskSpec `json:"task"` } func ParseConfigFromJSON(js string) (*Config, error) { @@ -14,8 +47,12 @@ func ParseConfigFromJSON(js string) (*Config, error) { if err := json.Unmarshal([]byte(js), &kfConfig); err != nil { return nil, err } + var initPeers plan.PeerList + for _, p := range kfConfig.Cluster.Worker { + initPeers = append(initPeers, plan.PeerID(p)) + } return &Config{ - // InitPeers: initPeers, + InitPeers: initPeers, // Strategy: *strategy, }, nil } From 5041cf7eeef14557542d80b41e2294efaf3f9124 Mon Sep 17 00:00:00 2001 From: lg Date: Mon, 28 Jun 2021 21:28:45 +0100 Subject: [PATCH 3/3] fix --- experimental/init_from_json/main.py | 19 ++++++++++++++----- experimental/init_from_json/run.sh | 4 ++-- srcs/go/kungfu/env/config_json.go | 26 ++++++++++++++------------ srcs/python/kungfu/python/__init__.py | 11 +++++++---- 4 files changed, 37 insertions(+), 23 deletions(-) diff --git a/experimental/init_from_json/main.py b/experimental/init_from_json/main.py index 354a7328e..dcac601f1 100644 --- a/experimental/init_from_json/main.py +++ b/experimental/init_from_json/main.py @@ -1,11 +1,13 @@ import argparse -from kungfu.python import init_from_config +import tensorflow as tf +from kungfu.python import current_cluster_size, current_rank, init_from_config +from kungfu.tensorflow.ops import all_reduce def parse_args(): p = argparse.ArgumentParser() - p.add_argument('--index', type=int, default=0) + p.add_argument('--rank', type=int, default=0) return p.parse_args() @@ -13,17 +15,24 @@ def main(): args = parse_args() config = { 'cluster': { - 'worker': [ + 'peers': [ '127.0.0.1:10010', '127.0.0.1:10011', ], }, - 'task': { - 'index': args.index, + 'self': { + 'rank': args.rank, }, } init_from_config(config) + + rank = current_rank() + size = current_cluster_size() + print('%d/%d' % (rank, size)) + x = tf.Variable(1 + int(rank), dtype=tf.int32) + y = all_reduce(x) + print(x, y) print('done') diff --git a/experimental/init_from_json/run.sh b/experimental/init_from_json/run.sh index 16c2b5919..922c64a50 100755 --- a/experimental/init_from_json/run.sh +++ b/experimental/init_from_json/run.sh @@ -5,8 +5,8 @@ cd $(dirname $0) export KUNGFU_DISABLE_AUTO_LOAD=1 -python3 main.py --index 0 & -python3 main.py --index 1 & +python3 main.py --rank 0 & +python3 main.py --rank 1 & wait diff --git a/srcs/go/kungfu/env/config_json.go b/srcs/go/kungfu/env/config_json.go index 73418c684..6a4eb4c45 100644 --- a/srcs/go/kungfu/env/config_json.go +++ b/srcs/go/kungfu/env/config_json.go @@ -5,18 +5,19 @@ import ( "net" "strconv" + "github.com/lsds/KungFu/srcs/go/kungfu/base" "github.com/lsds/KungFu/srcs/go/plan" ) -type taskID plan.PeerID +type peerID plan.PeerID // customized JSON encoding -func (t taskID) MarshalJSON() ([]byte, error) { - port := strconv.Itoa(int(t.Port)) - addr := net.JoinHostPort(plan.FormatIPv4(t.IPv4), port) +func (p peerID) MarshalJSON() ([]byte, error) { + port := strconv.Itoa(int(p.Port)) + addr := net.JoinHostPort(plan.FormatIPv4(p.IPv4), port) return json.Marshal(addr) } -func (t *taskID) UnmarshalJSON(bs []byte) error { +func (p *peerID) UnmarshalJSON(bs []byte) error { var s string if err := json.Unmarshal(bs, &s); err != nil { return err @@ -25,21 +26,21 @@ func (t *taskID) UnmarshalJSON(bs []byte) error { if err != nil { return err } - *t = taskID(*id) + *p = peerID(*id) return nil } type clusterSpec struct { - Worker []taskID `json:"worker"` + Peers []peerID `json:"peers"` } -type taskSpec struct { - Index int `json:"index"` +type peerSpec struct { + Rank int `json:"rank"` } type kungfuConfig struct { Cluster clusterSpec `json:"cluster"` - Task taskSpec `json:"task"` + Self peerSpec `json:"self"` } func ParseConfigFromJSON(js string) (*Config, error) { @@ -48,11 +49,12 @@ func ParseConfigFromJSON(js string) (*Config, error) { return nil, err } var initPeers plan.PeerList - for _, p := range kfConfig.Cluster.Worker { + for _, p := range kfConfig.Cluster.Peers { initPeers = append(initPeers, plan.PeerID(p)) } return &Config{ InitPeers: initPeers, - // Strategy: *strategy, + Self: initPeers[kfConfig.Self.Rank], + Strategy: base.DefaultStrategy, }, nil } diff --git a/srcs/python/kungfu/python/__init__.py b/srcs/python/kungfu/python/__init__.py index 50c890013..e65918e33 100644 --- a/srcs/python/kungfu/python/__init__.py +++ b/srcs/python/kungfu/python/__init__.py @@ -25,18 +25,21 @@ def _load_and_init_python_lib(): return _python_lib, has_nccl +def _load_python_lib(): + _load_clib('libkungfu') + _python_lib = _load_clib('libkungfu_python') + return _python_lib + + if os.getenv('KUNGFU_DISABLE_AUTO_LOAD'): - _python_lib = None + _python_lib = _load_python_lib() _has_nccl = None else: _python_lib, _has_nccl = _load_and_init_python_lib() def init_from_config(config): - global _python_lib global _has_nccl - _load_clib('libkungfu') - _python_lib = _load_clib('libkungfu_python') js = json.dumps(config) # https://stackoverflow.com/questions/61294630/ctypes-passing-a-·string-as-a-pointer-from-python-to-c _call_method_with(_python_lib, 'kungfu_python_init_from_json',