Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

generalized client and server code #4

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import tensorflow as tf
import os
import numpy as np
import uuid
import glob
from make_model import sample_net

class Client():
def __init__(self, clientid):
self.client_id = clientid
self.load_input_output()
self.model = sample_net(np.shape(self.inputs))
self.save_prior()

def load_input_output(self):
# example inputs/outputs taken from tf keras Model documentation
# in use case, load data from files
self.inputs = tf.keras.Input(shape=(3,))
x = tf.keras.layers.Dense(4, activation=tf.nn.relu)(self.inputs)
self.outputs = tf.keras.layers.Dense(5, activation=tf.nn.softmax)(x)

def save_prior(self):
self.model.save_weights('prior-'+self.client_id+'.h5', save_format = 'h5')

def train(self):
_op = 'adam'
_loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
_metrics = ['accuracy']
self.model.compile(optimizer=_op, loss=_loss, metrics=_metrics)
self.model.fit(inputs=self.inputs, outputs=self.outputs, epochs=1, verbose=2)

def load_consolidated(self):
list_of_files = glob.glob('/server/*')
latest_file = max(list_of_files, key=os.path.getctime)
self.model.load_weights(latest_file)

def save_weights(self):
filename = 'server/consolidated-'+self.client_id+'-'+uuid.uuid4().__str__()+'.h5'
self.model.save_weights(filename, save_format = 'h5')

# example usage

a = Client('A')
try:
a.load_consolidated()
except:
pass
a.train()
a.save_weights()
74 changes: 74 additions & 0 deletions dwc_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import tensorflow as tf
import tensorflow_probability as tfp
import os
import numpy as np
#import asyncio
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import FileResponse
#import websockets
#import uuid
import glob

app = FastAPI()

client_id = 'A'

# make dummy input/output data

inputs = None
outputs = None

# set up lightweight bayesian neural network
def sample_net(input_shape,
activation = tf.nn.relu,
batch_size = None):
def one_layer(x, dilation_rate=(1, 1, 1)):
x = tfp.layers.Convolution3DReparameterization(
kernel_size=3,
padding="same",
dilation_rate=dilation_rate,
activation=activation,
name="layer/vwnconv3d",
)(x)
x = tf.keras.layers.Activation(activation, name="layer/activation")(x)
return x

inputs = tf.keras.layers.input(shape = input_shape, batch_size=batch_size, name="inputs")
x = one_layer(inputs)

return tf.keras.Model(inputs=inputs, outputs=x)



model = sample_net(np.shape(inputs))
model.save_weights('prior-'+client_id+'.h5', save_format = 'h5')



def train(inputs, outputs, model):
# input shape: 4 x 1
_op = 'adam'
_loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
_metrics = ['accuracy']
model.compile(optimizer=_op, loss=_loss, metrics=_metrics)
model.fit(inputs=inputs, outputs=outputs, epochs=1, verbose=2)


def most_recent_consolidated():
list_of_files = glob.glob('/server/*')
latest_file = max(list_of_files, key=os.path.getctime)
return latest_file

# ping server side
@app.get('/')
async def send_weights():
return FileResponse('model-'+client_id+'.h5')

@app.post("/")
async def load_consolidated():
model.load_weights(most_recent_consolidated())
train(inputs, outputs, model)
return {'consolidated weights': model}
35 changes: 35 additions & 0 deletions dwc_implementation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@author: Aakanksha Rana, Emi Z Liu
"""
import numpy as np

def distributed_weight_consolidation(model_weights, model_priors):
# models is a list of weights of client-models; models = [model1, model2, model3...]
num_layers = int(len(model_weights[0])/2.0)
num_datasets = np.shape(model_weights)[0]
consolidated_model = model_weights[0]
mean_idx = [i for i in range(len(model_weights[0])) if i % 2 == 0]
std_idx = [i for i in range(len(model_weights[0])) if i % 2 != 0]
ep = 1e-5
for i in range(num_layers):
num_1 = 0; num_2 = 0
den_1 = 0; den_2 = 0
for m in range(num_datasets):
model = model_weights[m]
prior = model_priors[m]
mu_s = model[mean_idx[i]]
mu_o = prior[mean_idx[i]]
sig_s = model[std_idx[i]]
sig_o = prior[std_idx[i]]
d1 = np.power(sig_s,2) + ep; d2= np.power(sig_o,2) + ep
num_1 += (mu_s/d1)
num_2 += (mu_o/d2)
den_1 += (1.0/d1)
if m != num_datasets-1:
den_2 +=(1.0/d2)
consolidated_model[mean_idx[i]] = (num_1 - num_2)/(den_1 -den_2)
consolidated_model[std_idx[i]] = 1/(den_1 -den_2)
return consolidated_model

96 changes: 96 additions & 0 deletions dwc_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import tensorflow as tf
import os
#import asyncio
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import FileResponse
#import websockets
import copy
import numpy as np
import uuid

app = FastAPI()

datasets = []
num_clients = 1
model = None
priors = []
client_ids = []

@app.get("/")
async def root():
return "root"

@app.get("/{model}/")
async def update_model(model: tf.keras.Model):
return FileResponse('consolidated.h5')

@app.post("/weights")
async def load_weights(image: UploadFile = File(...)): #do we still need to load a file here
model.load_weights(image.filename)
datasets.append(copy.deepcopy(model))
dwc_frame()
return {'consolidated weights': model}

def load_data():
datasets = []
client_ids = []
path = os.getcwd()
for root, dirs, files in os.walk(path):
for filename in files:
if filename[:6] != 'model-':
continue
model.load_weights(filename)
datasets.append(copy.deepcopy(model))
client_ids.append(filename[6:-3])

def load_priors():
if not priors:
prior = None
for client in client_ids:
filename = 'prior-'+client+'.h5'
prior.load_weights(filename)
priors.append(copy.deepcopy(prior))

async def dwc_frame():
load_data()
load_priors()

model = distributed_weight_consolidation(datasets, priors)

model.save_weights(
'server/consolidated-'+uuid.uuid4().__str__()+'.h5',
save_format = 'h5')

update_model(model)

async def distributed_weight_consolidation(model_weights, model_priors):
# models is a list of weights of client-models; models = [model1, model2, model3...]
num_layers = int(len(model_weights[0])/2.0)
num_datasets = np.shape(model_weights)[0]
consolidated_model = model_weights[0]
mean_idx = [i for i in range(len(model_weights[0])) if i % 2 == 0]
std_idx = [i for i in range(len(model_weights[0])) if i % 2 != 0]
ep = 1e-5
for i in range(num_layers):
num_1 = 0; num_2 = 0
den_1 = 0; den_2 = 0
for m in range(num_datasets):
model = model_weights[m]
prior = model_priors[m]
mu_s = model[mean_idx[i]]
mu_o = prior[mean_idx[i]]
sig_s = model[std_idx[i]]
sig_o = prior[std_idx[i]]
d1 = np.power(sig_s,2) + ep; d2= np.power(sig_o,2) + ep
num_1 += (mu_s/d1)
num_2 += (mu_o/d2)
den_1 += (1.0/d1)
if m != num_datasets-1:
den_2 +=(1.0/d2)
consolidated_model[mean_idx[i]] = (num_1 - num_2)/(den_1 -den_2)
consolidated_model[std_idx[i]] = 1/(den_1 -den_2)
return consolidated_model

63 changes: 63 additions & 0 deletions fedavg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import tensorflow as tf
from tensorflow_federated.python.learning import federated_averaging
import os
import asyncio
from fastapi import FastAPI, WebSocket, File, UploadFile
from fastapi.responses import HTMLResponse
import websockets
import copy

app = FastAPI()

datasets = []
num_clients = 1
model = None
clients = [0] #this would be a list of all client ids

@app.get('/')
async def root:
return 'root'

@app.get("/{model}/")
async def update_model(model: tf.keras.Model):
return model.load_weights('consolidated.h5')

@app.post("/weights")
async def load_weights(image: UploadFile = File(...)):
model.load_weights(image.filename)
datasets.append(copy.deepcopy(model))
load_data()
return {'consolidated weights': model}

async def load_data():
for client_id in clients:
await load_weights(client_id)

async def fed_avg():
await load_data()

iterative_process = federated_averaging.build_federated_averaging_process(
model_fn=model_examples.LinearRegression,
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.1))

model = iterative_process.initialize()

self.assertIsInstance(
iterative_process.get_model_weights(model), model_utils.ModelWeights)
self.assertAllClose(model.model.trainable,
iterative_process.get_model_weights(model).trainable)

for _ in range(num_clients):
model, _ = iterative_process.next(model, datasets)
# self.assertIsInstance(
# iterative_process.get_model_weights(model), model_utils.ModelWeights)
# self.assertAllClose(model.model.trainable,
# iterative_process.get_model_weights(model).trainable)


model.save_weights('consolidated.h5', save_format = 'h5')
update_model(model)

29 changes: 29 additions & 0 deletions make_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Dec 9 16:41:10 2020

@author: Emi Z Liu
"""
import tensorflow as tf
import tensorflow_probability as tfp

# set up lightweight bayesian neural network
def sample_net(input_shape,
activation = tf.nn.relu,
batch_size = None):
def one_layer(x, dilation_rate=(1, 1, 1)):
x = tfp.layers.Convolution3DReparameterization(
kernel_size=3,
padding="same",
dilation_rate=dilation_rate,
activation=activation,
name="layer/vwnconv3d",
)(x)
x = tf.keras.layers.Activation(activation, name="layer/activation")(x)
return x

inputs = tf.keras.layers.input(shape = input_shape, batch_size=batch_size, name="inputs")
x = one_layer(inputs)

return tf.keras.Model(inputs=inputs, outputs=x)
Loading