Vous pouvez utiliser les formats de données d'entrée et de sortie suivants pour les modèles Python Decision Optimization.
Données d'entrée
Pour les modèles Python DOcplex, les données d'entrée peuvent être lues à partir d'un fichier dans n'importe quel format. Si vous souhaitez utiliser des sources externes pour alimenter vos données d'entrée à l'aide de connecteurs, vous devez utiliser des fichiers .csv
pour vos données d'entrée.
get_all_inputs
pour lire des fichiers et renvoyer un dictionnaire d'images de données.from docplex.util.environment import get_environment
import pandas
from six import iteritems
from collections.abc import Mapping
from os.path import join, dirname, basename, splitext, exists
import glob
class _InputDict(dict):
def __init__(self, directory, names):
dict.__init__(self)
self._directory = directory
for k in names:
dict.__setitem__(self, k, None)
file='model_schema.json'
if self._directory is not None:
file = "{0}/".format(self._directory) + file
self.dtype_schemas = self.get_dtype_schemas( file)
def __getitem__(self, key):
if isinstance(key, str):
item = dict.__getitem__(self, key)
if item is None:
file = "{0}.csv".format(key)
if file in self.dtype_schemas:
return self.read_df( key, dtype=self.dtype_schemas[file])
else:
return self.read_df( key)
else:
return item
else:
raise Exception("Accessing input dict via non string index")
def read_df(self, key, **kwargs):
env = get_environment()
file = "{0}.csv".format(key)
if self._directory is not None:
file = "{0}/".format(self._directory) + file
with env.get_input_stream(file) as ist:
params = {'encoding': 'utf8'}
if kwargs:
params.update(kwargs)
df = pandas.read_csv( ist, **params)
dict.__setitem__(self, key, df)
return df
def get_dtype_schemas(self, path):
dtype_schemas = {}
if exists(path):
input_schemas=json.load(open(path))
if 'input' in input_schemas:
for input_schema in input_schemas['input']:
dtype_schema = {}
if 'fields' in input_schema:
for input_schema_field in input_schema['fields']:
if input_schema_field['type']=='string':
dtype_schema[input_schema_field['name']]='str'
if len(dtype_schema) > 0:
dtype_schemas[input_schema['id']]=dtype_schema
print(dtype_schemas)
return dtype_schemas
class _LazyDict(Mapping):
def __init__(self, *args, **kw):
self._raw_dict = _InputDict(*args, **kw)
def __getitem__(self, key):
return self._raw_dict.__getitem__(key)
def __iter__(self):
return iter(self._raw_dict)
def __len__(self):
return len(self._raw_dict)
def read_df(self, key, **kwargs):
return self._raw_dict.read_df(key, **kwargs)
def get_all_inputs(directory=None):
'''Utility method to read a list of files and return a tuple with all
read data frames.
Returns:
a map { datasetname: data frame }
'''
all_csv = "*.csv"
g = join(directory, all_csv) if directory else all_csv
names = [splitext(basename(f))[0] for f in glob.glob(g)]
result = _LazyDict(directory, names)
return result
Données de sortie
def write_all_outputs(outputs):
'''Write all dataframes in ``outputs`` as .csv.
Args:
outputs: The map of outputs 'outputname' -> 'output df'
'''
for (name, df) in iteritems(outputs):
csv_file = '%s.csv' % name
print(csv_file)
with get_environment().get_output_stream(csv_file) as fp:
if sys.version_info[0] < 3:
fp.write(df.to_csv(index=False, encoding='utf8'))
else:
fp.write(df.to_csv(index=False).encode(encoding='utf8'))
if len(outputs) == 0:
print("Warning: no outputs written")
Vous pouvez également utiliser le get_environment().get_output_stream(csv_file) as fp:
pour enregistrer vos sorties.
Pour un exemple complet de déploiement d'un modèle Decision Optimization Python DOcplex, voir l'exemple Deploying a DO model with WML situé dans le dossier jupyter du DO-samples. Sélectionnez le sous-dossier correspondant au produit et à la version en question.
Cet exemple peut également être trouvé dans le Cloud Pak for Data Resource hub, voir Déploiement d'un modèle d'Decision Optimization dans watsonx.ai Runtime.