Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions experimental/init_from_json/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import argparse

import tensorflow as tf
from kungfu.python import current_cluster_size, current_rank, init_from_config
from kungfu.tensorflow.ops import all_reduce


def parse_args():
p = argparse.ArgumentParser()
p.add_argument('--rank', type=int, default=0)
return p.parse_args()


def main():
args = parse_args()
config = {
'cluster': {
'peers': [
'127.0.0.1:10010',
'127.0.0.1:10011',
],
},
'self': {
'rank': args.rank,
},
}

init_from_config(config)

rank = current_rank()
size = current_cluster_size()
print('%d/%d' % (rank, size))
x = tf.Variable(1 + int(rank), dtype=tf.int32)
y = all_reduce(x)
print(x, y)
print('done')


main()
13 changes: 13 additions & 0 deletions experimental/init_from_json/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/sh
set -e

cd $(dirname $0)

export KUNGFU_DISABLE_AUTO_LOAD=1

python3 main.py --rank 0 &
python3 main.py --rank 1 &

wait

echo done
4 changes: 3 additions & 1 deletion srcs/cpp/include/kungfu/peer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ using TransformFunc = std::function<void(
class Peer
{
public:
static std::unique_ptr<Peer> &GetDefault(bool reinit = false);

static std::unique_ptr<Peer> &GetDefault(bool reinit = false);

Peer();

Peer(int rank, int size); // Single Machine Multi-Process
Peer(const char *pJson); // init from JSON

~Peer();

Expand Down
1 change: 1 addition & 0 deletions srcs/cpp/include/kungfu/python/c_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ extern "C" {

extern void kungfu_python_init();
extern void kungfu_python_init_single_machine(int rank, int size);
extern void kungfu_python_init_from_json(const char *pJson);
extern void kungfu_python_init_nccl();

extern void kungfu_python_finialize();
Expand Down
9 changes: 9 additions & 0 deletions srcs/cpp/src/peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ Peer::Peer(int rank, int size)
}
}

Peer::Peer(const char *pJson)
{
const int err = GoKungfuInitFromJSON(const_cast<char *>(pJson));
if (err) {
fprintf(stderr, "%s failed\n", "GoKungfuInitFromJSON");
exit(1);
}
}

Peer::~Peer() { GoKungfuFinalize(); }

bool Peer::Detached() const { return GoKungfuDetached(); }
Expand Down
6 changes: 6 additions & 0 deletions srcs/cpp/src/python/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ void kungfu_python_init_single_machine(int rank, int size)
_init_affinity();
}

void kungfu_python_init_from_json(const char *pJson)
{
kungfu::Peer::GetDefault().reset(new kungfu::Peer(pJson));
_init_affinity();
}

void kungfu_python_finialize() { kungfu::Peer::GetDefault().reset(nullptr); }

uint64_t kungfu_uid() { return kungfu::Peer::GetDefault()->Uid(); }
Expand Down
60 changes: 60 additions & 0 deletions srcs/go/kungfu/env/config_json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package env

import (
"encoding/json"
"net"
"strconv"

"github.com/lsds/KungFu/srcs/go/kungfu/base"
"github.com/lsds/KungFu/srcs/go/plan"
)

type peerID plan.PeerID // customized JSON encoding

func (p peerID) MarshalJSON() ([]byte, error) {
port := strconv.Itoa(int(p.Port))
addr := net.JoinHostPort(plan.FormatIPv4(p.IPv4), port)
return json.Marshal(addr)
}

func (p *peerID) UnmarshalJSON(bs []byte) error {
var s string
if err := json.Unmarshal(bs, &s); err != nil {
return err
}
id, err := plan.ParsePeerID(s)
if err != nil {
return err
}
*p = peerID(*id)
return nil
}

type clusterSpec struct {
Peers []peerID `json:"peers"`
}

type peerSpec struct {
Rank int `json:"rank"`
}

type kungfuConfig struct {
Cluster clusterSpec `json:"cluster"`
Self peerSpec `json:"self"`
}

func ParseConfigFromJSON(js string) (*Config, error) {
var kfConfig kungfuConfig
if err := json.Unmarshal([]byte(js), &kfConfig); err != nil {
return nil, err
}
var initPeers plan.PeerList
for _, p := range kfConfig.Cluster.Peers {
initPeers = append(initPeers, plan.PeerID(p))
}
return &Config{
InitPeers: initPeers,
Self: initPeers[kfConfig.Self.Rank],
Strategy: base.DefaultStrategy,
}, nil
}
1 change: 1 addition & 0 deletions srcs/go/kungfu/env/config_json_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package env
15 changes: 15 additions & 0 deletions srcs/go/libkungfu-comm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ func GoKungfuInit() int {
return errorCode("Start", defaultPeer.Start())
}

//export GoKungfuInitFromJSON
func GoKungfuInitFromJSON(pJSON *C.char) int {
js := C.GoString(pJSON)
log.Errorf("GoKungfuInitFromJSON: %s", js)
cfg, err := env.ParseConfigFromJSON(js)
if err != nil {
errorCode("ParseConfigFromJSON", err)
}
defaultPeer, err = peer.NewFromConfig(cfg)
if err != nil {
return errorCode("NewFromConfig", err)
}
return errorCode("Start", defaultPeer.Start())
}

//export GoKungfuInitSingleMachine
func GoKungfuInitSingleMachine(rank, size C.int) int {
cfg, err := env.SingleMachineEnv(int(rank), int(size))
Expand Down
27 changes: 22 additions & 5 deletions srcs/python/kungfu/python/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import atexit
import json
import os
from ctypes import byref, c_char, c_char_p, c_int

from kungfu.loader import (_call_method, _call_method_with, _load_clib,
_module_path)
Expand All @@ -23,9 +25,26 @@ def _load_and_init_python_lib():
return _python_lib, has_nccl


_python_lib = None
_has_nccl = None
_python_lib, _has_nccl = _load_and_init_python_lib()
def _load_python_lib():
_load_clib('libkungfu')
_python_lib = _load_clib('libkungfu_python')
return _python_lib


if os.getenv('KUNGFU_DISABLE_AUTO_LOAD'):
_python_lib = _load_python_lib()
_has_nccl = None
else:
_python_lib, _has_nccl = _load_and_init_python_lib()


def init_from_config(config):
global _has_nccl
js = json.dumps(config)
# https://stackoverflow.com/questions/61294630/ctypes-passing-a-·string-as-a-pointer-from-python-to-c
_call_method_with(_python_lib, 'kungfu_python_init_from_json',
c_char_p(js.encode()))
_has_nccl = _call_method(_python_lib, 'kungfu_python_init_nccl')


def _init_single_machine_multiple_process(rank, size):
Expand Down Expand Up @@ -129,8 +148,6 @@ def show_nccl_version():

# unstable APIs

from ctypes import byref, c_int, c_char


def _resize_from_url():
changed = c_char()
Expand Down